hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1524800 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbas...
Date Thu, 19 Sep 2013 18:18:18 GMT
Author: liyin
Date: Thu Sep 19 18:18:17 2013
New Revision: 1524800

URL: http://svn.apache.org/r1524800
Log:
[HBASE-9102]  Blocking scan preload

Author: mahmoudeariby

Summary:
This diff adds scan preload feature which works in a blocking fashion.
Per storefile scanner we try to keep a sliding window of the blocks in the block cache (by preloading them in the background)
More about this feature is described here
https://our.intern.facebook.com/intern/wiki/index.php/HBase/Scan_preload
This diff removes the Non blocking preloading from D889314 and includes more fixes.

Test Plan:
Run TestFromClientSide4 which tests the feature.
Run All other unit tests to make sure nothing broke.
Test on devcluster.

Reviewers: liyintang, rshroff, manukranthk

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D971692

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/PreloadThreadPool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/ScanSearch.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide4.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheKeyWithEncoding.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Sep 19 18:18:17 2013
@@ -380,6 +380,25 @@ public final class HConstants {
   public static final String HREGIONSERVER_ENABLE_SERVERSIDE_PROFILING =
       "hbase.regionserver.enable.serverside.profiling";
 
+  /** Conf key for the preload blocks count if preloading is enabled for some scanner */
+  public static final String SCAN_PRELOAD_BLOCK_COUNT =
+      "hbase.regionserver.preload.block.count";
+  /** Default number of blocks to preload during sequential scan of hfile (if enabled)*/
+  public static final int DEFAULT_PRELOAD_BLOCK_COUNT = 64;
+  /** Conf key for the core preload threads */
+  public static final String CORE_PRELOAD_THREAD_COUNT = "hbase.regionserver.core.preload.thread.count";
+  /** Default number of core preload threads per region server */
+  public static final int DEFAULT_CORE_PRELOAD_THREAD_COUNT = 1;
+  /** Conf key for the max preload threads */
+  public static final String MAX_PRELOAD_THREAD_COUNT = "hbase.regionserver.max.preload.thread.count";
+  /** Defualt number of core preload threads per region server */
+  public static final int DEFAULT_MAX_PRELOAD_THREAD_COUNT = 64;
+  /** Conf key for max preload blocks kept in cache per hfilescanner */
+  public static final String MAX_PRELOAD_BLOCKS_KEPT_IN_CACHE =
+      "hbase.regionserver.preload.blocks.kept.in.cache";
+  /** Default maximum number of preload blocks to keep in block cache per hfilescanner */
+  public static final int DEFAULT_MAX_PRELOAD_BLOCKS_KEPT_IN_CACHE = 128;
+  
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Sep 19 18:18:17 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -606,6 +607,17 @@ public class HTable implements HTableInt
    */
   public ResultScanner getLocalScanner(final Scan scan,
       boolean createNewHardlinks) throws IOException {
+    // Construct preload threads in case this scanner is preloading scanner
+    // We'll obtain the number of threads from the HTableCon TODO (is this the right place ? )
+    if (scan.isPreloadBlocks()) {
+      int minimum =
+          getConfiguration().getInt(HConstants.CORE_PRELOAD_THREAD_COUNT,
+            HConstants.DEFAULT_CORE_PRELOAD_THREAD_COUNT);
+      int maximum =
+          getConfiguration().getInt(HConstants.MAX_PRELOAD_THREAD_COUNT,
+            HConstants.DEFAULT_MAX_PRELOAD_THREAD_COUNT);
+      PreloadThreadPool.constructPreloaderThreadPool(minimum, maximum);
+    }
     ClientLocalScanner s =
         new ClientLocalScanner(scan, this, createNewHardlinks);
     s.initialize();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Thu Sep 19 18:18:17 2013
@@ -89,7 +89,8 @@ public class Scan extends Operation impl
   private static final byte RESPONSE_SIZE_VERSION = (byte)4;
   private static final byte FLASHBACK_VERSION = (byte) 5;
   private static final byte PREFETCH_VERSION = (byte) 6;
-  private static final byte SCAN_VERSION = PREFETCH_VERSION;
+  private static final byte PRELOAD_VERSION = (byte) 7;
+  private static final byte SCAN_VERSION = PRELOAD_VERSION;
 
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
@@ -108,6 +109,8 @@ public class Scan extends Operation impl
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
   private long effectiveTS = HConstants.LATEST_TIMESTAMP;
+  /*This tells whether the scanner will preload blocks or not*/
+  private boolean preloadBlocks = false;
 
   /**
    * Create a Scan operation across all rows.
@@ -253,6 +256,22 @@ public class Scan extends Operation impl
   }
 
   /**
+   * This tells whether this scanner will preload data blocks or not.
+   * @return status of preload data blocks
+   */
+  public boolean isPreloadBlocks() {
+    return preloadBlocks;
+  }
+  
+  /**
+   * Set whether this scanner should preload data blocks or not.
+   * @param value
+   */
+  public void setPreloadBlocks(boolean value) {
+    preloadBlocks = true;
+  }
+
+  /**
    * Get versions of columns with the specified timestamp. Note, default maximum
    * versions to return is 1.  If your time range spans more than one version
    * and you want all versions returned, up the number of versions beyond the
@@ -700,7 +719,10 @@ public class Scan extends Operation impl
     }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
-    if(in.readBoolean()) {
+    if (version >= PRELOAD_VERSION) {
+      this.preloadBlocks = in.readBoolean();
+    }
+    if (in.readBoolean()) {
       this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
       this.filter.readFields(in);
     }
@@ -726,7 +748,9 @@ public class Scan extends Operation impl
     // We try to talk a protocol version as low as possible so that we can be
     // backward compatible as far as possible.
     byte version = (byte) 1;
-    if (serverPrefetching) {
+    if (preloadBlocks) {
+      version = PRELOAD_VERSION;
+    } else if (serverPrefetching) {
       version = PREFETCH_VERSION;
     } else if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
       version = FLASHBACK_VERSION;
@@ -762,6 +786,9 @@ public class Scan extends Operation impl
     }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
+    if (version >= PRELOAD_VERSION) {
+      out.writeBoolean(this.preloadBlocks);
+    }
     if(this.filter == null) {
       out.writeBoolean(false);
     } else {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java Thu Sep 19 18:18:17 2013
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -80,8 +81,9 @@ public class HalfStoreFileReader extends
 
   @Override
   public HFileScanner getScanner(final boolean cacheBlocks,
-                                final boolean isCompaction) {
-    final HFileScanner s = super.getScanner(cacheBlocks, isCompaction);
+      final boolean isCompaction, final boolean preloadBlocks) {
+    final HFileScanner s =
+        super.getScanner(cacheBlocks, isCompaction, preloadBlocks);
     return new HFileScanner() {
       final HFileScanner delegate = s;
       public boolean atEnd = false;
@@ -250,6 +252,11 @@ public class HalfStoreFileReader extends
       public boolean currKeyValueObtainedFromCache() {
         return this.delegate.currKeyValueObtainedFromCache();
       }
+
+      @Override
+      public void close() {
+        s.close();
+      }
     };
   }
 
@@ -259,7 +266,7 @@ public class HalfStoreFileReader extends
       return super.getLastKey();
     }
     // Get a scanner that caches the block and that uses pread.
-    HFileScanner scanner = getScanner(true, false);
+    HFileScanner scanner = getScanner(true, false, false);
     try {
       if (scanner.seekBefore(this.splitkey)) {
         return Bytes.toBytes(scanner.getKey());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Thu Sep 19 18:18:17 2013
@@ -187,7 +187,7 @@ public abstract class AbstractHFileReade
    */
   @Override
   public HFileScanner getScanner(boolean cacheBlocks) {
-    return getScanner(cacheBlocks, false);
+    return getScanner(cacheBlocks, false, false);
   }
 
   /**
@@ -315,7 +315,7 @@ public abstract class AbstractHFileReade
     }
 
     @Override
-    public boolean isSeeked(){
+    public boolean isSeeked() {
       return blockBuffer != null;
     }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Thu Sep 19 18:18:17 2013
@@ -52,8 +52,11 @@ public class BlockCacheKey implements He
 
   @Override
   public int hashCode() {
-    return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32)) +
-        encoding.ordinal() * 17;
+    return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32));
+//     + encoding.ordinal() * 17;
+    // There's no reason for encoding to be in the cache key as the encoding already exists in the
+    // block itself.
+    // Also encoding doesn't contribute in the equal(Object o) method.
   }
 
   @Override

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Sep 19 18:18:17 2013
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -431,8 +432,22 @@ public class HFile {
 
   /** An abstraction used by the block index */
   public interface CachingBlockReader {
+    /**
+     * Read in a file block.
+     * @param dataBlockOffset offset to read.
+     * @param onDiskBlockSize size of the block
+     * @param cacheBlock
+     * @param isCompaction is this block being read as part of a compaction
+     * @param cacheOnPreload should we put this block into cache because we are preloading it
+     * @param expectedBlockType the block type we are expecting to read with this read operation, or
+     *          null to read whatever block type is available and avoid checking (that might reduce
+     *          caching efficiency of encoded data blocks)
+     * @param obtainedFromCache
+     * @return Block wrapped in a ByteBuffer.
+     * @throws IOException
+     */
     HFileBlock readBlock(long offset, long onDiskBlockSize,
-        boolean cacheBlock, final boolean isCompaction,
+        boolean cacheBlock, final boolean isCompaction, boolean cacheOnPreload,
         BlockType expectedBlockType, KeyValueContext kvContext)
         throws IOException;
   }
@@ -450,7 +465,8 @@ public class HFile {
 
     RawComparator<byte []> getComparator();
 
-    HFileScanner getScanner(boolean cacheBlocks, final boolean isCompaction);
+    HFileScanner getScanner(boolean cacheBlocks, final boolean isCompaction,
+        boolean preloadBlocks);
 
     ByteBuffer getMetaBlock(String metaBlockName,
        boolean cacheBlock) throws IOException;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Thu Sep 19 18:18:17 2013
@@ -259,7 +259,7 @@ public class HFileBlockIndex {
             expectedBlockType = BlockType.DATA;
           }
           block = cachingBlockReader.readBlock(currentOffset,
-              currentOnDiskSize, shouldCache, isCompaction,
+              currentOnDiskSize, shouldCache, isCompaction, false,
               expectedBlockType, kvContext);
         }
 
@@ -334,9 +334,9 @@ public class HFileBlockIndex {
         }
 
         // Caching, using pread, assuming this is not a compaction.
-        HFileBlock midLeafBlock = cachingBlockReader.readBlock(
-            midLeafBlockOffset, midLeafBlockOnDiskSize, true, false,
-            BlockType.LEAF_INDEX, null);
+        HFileBlock midLeafBlock =
+            cachingBlockReader.readBlock(midLeafBlockOffset, midLeafBlockOnDiskSize, true, false,
+              false, BlockType.LEAF_INDEX, null);
 
         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
         int numDataBlocks = b.getInt();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Thu Sep 19 18:18:17 2013
@@ -224,7 +224,7 @@ public class HFilePrettyPrinter {
 
     if (printKey || checkRow || checkFamily) {
       // scan over file and read key/value's, performing any requested checks
-      HFileScanner scanner = reader.getScanner(false, false);
+      HFileScanner scanner = reader.getScanner(false, false, false);
       if (this.isSeekToRow) {
         // seek to the first kv on this row
         scanner.seekTo(KeyValue.createFirstOnRow(this.row).getKey());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Thu Sep 19 18:18:17 2013
@@ -170,7 +170,8 @@ public class HFileReaderV1 extends Abstr
    * @return Scanner on this file.
    */
   @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean isCompaction) {
+  public HFileScanner getScanner(boolean cacheBlocks,
+      final boolean isCompaction, boolean preloadBlocks) {
     return new ScannerV1(this, cacheBlocks, isCompaction);
   }
 
@@ -501,6 +502,15 @@ public class HFileReaderV1 extends Abstr
       blockSeek(key, offset, length, true);
       return true;
     }
+    
+    @Override
+    public void close() {
+      /*
+       * Note this function is overridden as we need to have it in HFileScanner interface, so that
+       * the store scanner can delegate the close operation(which is required in case of
+       * block preloading enabled) to the HFileScanner it owns
+       */
+    }
   }
 
   /**
@@ -699,13 +709,13 @@ public class HFileReaderV1 extends Abstr
     public boolean currKeyValueObtainedFromCache() {
       return this.kvContext.getObtainedFromCache();
     }
-
+    
   }
 
   @Override
-  public HFileBlock readBlock(long offset, long onDiskBlockSize,
-      boolean cacheBlock, boolean isCompaction,
-      BlockType expectedBlockType, KeyValueContext kvContext) {
+  public HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock,
+      boolean isCompaction, boolean cacheOnPreload, BlockType expectedBlockType,
+      KeyValueContext kvContext) {
     throw new UnsupportedOperationException();
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Sep 19 18:18:17 2013
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.BlockMetricType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.io.WritableUtils;
@@ -59,11 +64,16 @@ public class HFileReaderV2 extends Abstr
   private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
 
   private boolean includesMemstoreTS = false;
-
+  
+  /** number of blocks that we'll preload in case enabled */
+  private int preloadBlockCount;
+  /** maximum number of preload blocks that we'll keep in the block cache */
+  private int preloadBlocksKeptInCache;
+  
   private boolean shouldIncludeMemstoreTS() {
     return includesMemstoreTS;
   }
-
+  
   /**
    * A "sparse lock" implementation allowing to lock on a particular block
    * identified by offset. The purpose of this is to avoid two clients loading
@@ -71,7 +81,7 @@ public class HFileReaderV2 extends Abstr
    * cache.
    */
   private IdLock offsetLock = new IdLock();
-
+  
   /**
    * Blocks read from the load-on-open section, excluding data root index, meta
    * index, and file info.
@@ -153,6 +163,12 @@ public class HFileReaderV2 extends Abstr
     while ((b = blockIter.nextBlock()) != null) {
       loadOnOpenBlocks.add(b);
     }
+    preloadBlockCount =
+        conf.getInt(HConstants.SCAN_PRELOAD_BLOCK_COUNT,
+          HConstants.DEFAULT_PRELOAD_BLOCK_COUNT);
+    preloadBlocksKeptInCache =
+        conf.getInt(HConstants.MAX_PRELOAD_BLOCKS_KEPT_IN_CACHE,
+          HConstants.DEFAULT_MAX_PRELOAD_BLOCKS_KEPT_IN_CACHE);
   }
 
   /**
@@ -166,14 +182,14 @@ public class HFileReaderV2 extends Abstr
    * @return Scanner on this file.
    */
   @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean isCompaction) {
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean isCompaction,
+      boolean preloadBlocks) {
     // check if we want to use data block encoding in memory
     if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
       return new EncodedScannerV2(this, cacheBlocks, isCompaction,
-          includesMemstoreTS);
+          includesMemstoreTS, preloadBlocks);
     }
-
-    return new ScannerV2(this, cacheBlocks, isCompaction);
+    return new ScannerV2(this, cacheBlocks, isCompaction, preloadBlocks);
   }
 
   /**
@@ -249,6 +265,7 @@ public class HFileReaderV2 extends Abstr
    * @param onDiskBlockSize size of the block
    * @param cacheBlock
    * @param isCompaction is this block being read as part of a compaction
+   * @param cacheOnPreload should we cache this block because we are preloading
    * @param expectedBlockType the block type we are expecting to read with this
    *          read operation, or null to read whatever block type is available
    *          and avoid checking (that might reduce caching efficiency of
@@ -259,8 +276,14 @@ public class HFileReaderV2 extends Abstr
    */
   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
       final boolean cacheBlock, final boolean isCompaction,
-      BlockType expectedBlockType, KeyValueContext kvContext)
-      throws IOException {
+      boolean cacheOnPreload, BlockType expectedBlockType,
+      KeyValueContext kvContext) throws IOException {
+    /*
+     * time at which we entered the function, for metrics we use readTime as the whole time spent in
+     * this function not just the time spent reading disk, this is just to add extra cacheHits into
+     * consideration
+     */
+    long startTime = System.currentTimeMillis();
     if (dataBlockIndexReader == null) {
       throw new IOException("Block index not loaded");
     }
@@ -281,10 +304,10 @@ public class HFileReaderV2 extends Abstr
         new BlockCacheKey(name, dataBlockOffset,
             dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
             expectedBlockType);
-
     // Checking the block cache.
-    HFileBlock cachedBlock = this.getCachedBlock(cacheKey, cacheBlock, isCompaction,
-        expectedBlockType, true);
+    HFileBlock cachedBlock =
+        this.getCachedBlock(cacheKey, cacheBlock, isCompaction,
+          expectedBlockType);
     if (cachedBlock != null) {
       if (kvContext != null) {
         if (LOG.isTraceEnabled()) {
@@ -293,18 +316,32 @@ public class HFileReaderV2 extends Abstr
         }
         kvContext.setObtainedFromCache(true);
       }
+      // update schema metrics
+      getSchemaMetrics().updateOnBlockRead(
+        cachedBlock.getBlockType().getCategory(), isCompaction,
+        System.currentTimeMillis() - startTime, cacheOnPreload, true);
+      // update profiling data
+      Call call = HRegionServer.callContext.get();
+      ProfilingData pData = call == null ? null : call.getProfilingData();
+      if (pData != null) {
+        pData.incInt(ProfilingData.blockHitCntStr(cachedBlock.getBlockType()
+            .getCategory(), cachedBlock.getColumnFamilyName()));
+      }
       return cachedBlock;
     }
-
     IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
     try {
       // Double checking the block cache again within the IdLock
-      cachedBlock = this.getCachedBlock(cacheKey, cacheBlock, isCompaction,
-          expectedBlockType, false);
+      cachedBlock =
+          this.getCachedBlock(cacheKey, cacheBlock, isCompaction,
+            expectedBlockType);
       if (cachedBlock != null) {
         if (kvContext != null) {
           kvContext.setObtainedFromCache(true);
         }
+        getSchemaMetrics().updateOnBlockRead(
+          cachedBlock.getBlockType().getCategory(), isCompaction,
+          System.currentTimeMillis() - startTime, cacheOnPreload, true);
         return cachedBlock;
       }
       // First, check if the block exists in L2 cache
@@ -334,6 +371,9 @@ public class HFileReaderV2 extends Abstr
           cacheConf.getBlockCache().cacheBlock(cacheKey, cachedBlock,
               cacheConf.isInMemory());
         }
+        getSchemaMetrics().updateOnBlockRead(
+          cachedBlock.getBlockType().getCategory(), isCompaction,
+          System.currentTimeMillis() - startTime, cacheOnPreload, true);
         // Return early if a block exists in the L2 cache
         return cachedBlock;
       }
@@ -362,15 +402,16 @@ public class HFileReaderV2 extends Abstr
       if (kvContext != null) {
         kvContext.setObtainedFromCache(false);
       }
-      getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction,
-          TimeUnit.NANOSECONDS.toMillis(deltaNs));
+      getSchemaMetrics().updateOnBlockRead(
+       blockCategory, isCompaction,
+        System.currentTimeMillis() - startTime, cacheOnPreload, false);
 
       // Cache the block if necessary
-
-      if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
-              hfileBlock.getBlockType().getCategory())) {
+      if (cacheOnPreload
+          || (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock
+              .getBlockType().getCategory()))) {
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
-            cacheConf.isInMemory());
+          cacheConf.isInMemory());
       }
       Call call = HRegionServer.callContext.get();
       ProfilingData pData = call == null ? null : call.getProfilingData();
@@ -401,7 +442,7 @@ public class HFileReaderV2 extends Abstr
   }
 
   private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock,
-      boolean isCompaction, BlockType expectedBlockType, boolean updateMetrics) throws IOException {
+      boolean isCompaction, BlockType expectedBlockType) throws IOException {
     // Check cache for block. If found return.
     if (cacheConf.isBlockCacheEnabled()) {
       HFileBlock cachedBlock = (HFileBlock)
@@ -419,21 +460,6 @@ public class HFileReaderV2 extends Abstr
               "has wrong encoding: " + cachedBlock.getDataBlockEncoding() +
               " (expected: " + dataBlockEncoder.getEncodingInCache() + ")");
         }
-
-        // Update the metrics if enabled
-        if (updateMetrics) {
-          BlockCategory blockCategory =
-            cachedBlock.getBlockType().getCategory();
-          getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
-
-          Call call = HRegionServer.callContext.get();
-          ProfilingData pData = call == null ? null : call.getProfilingData();
-          if (pData != null) {
-            pData.incInt(ProfilingData.blockHitCntStr(
-                cachedBlock.getBlockType().getCategory(),
-                cachedBlock.getColumnFamilyName()));
-          }
-        }
         return cachedBlock;
       }
     }
@@ -502,7 +528,7 @@ public class HFileReaderV2 extends Abstr
           "but got " + actualBlockType + ": " + block);
     }
   }
-
+  
   /**
    * @return Last key in the file. May be null if file has no entries. Note that
    *         this is not the last row key, but rather the byte form of the last
@@ -551,17 +577,15 @@ public class HFileReaderV2 extends Abstr
             + " block(s) from L2 cache");
       }
     }
-
     if (closeIStream && istream != null) {
       istream.close();
       istream = null;
     }
   }
-
+  
   protected abstract static class AbstractScannerV2
       extends AbstractHFileReader.Scanner {
     protected HFileBlock block;
-
     /**
      * The next indexed key is to keep track of the indexed key of the next data block.
      * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
@@ -570,10 +594,272 @@ public class HFileReaderV2 extends Abstr
      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
      */
     protected byte[] nextIndexedKey;
-
+    
+    static final boolean ON_PRELOAD = true;
+    boolean preloadBlocks;
+    int scanPreloadBlocksCount;
+    int scanPreloadBlocksKeptInCache;
+   
+    /** Responsible for creating Block Preloaders in a blocking manner */
+    private BlockingPreloadManager blockManager;
+    
+    private HFileReaderV2 hfileReaderV2;
+    
     public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
-        final boolean isCompaction) {
+        final boolean isCompaction, boolean preloadBlocks) {
       super(r, cacheBlocks, isCompaction);
+      hfileReaderV2 = r;
+      scanPreloadBlocksCount = r.preloadBlockCount;
+      scanPreloadBlocksKeptInCache = r.preloadBlocksKeptInCache;
+      this.preloadBlocks = preloadBlocks;
+      if (preloadBlocks) {
+        blockManager = new BlockingPreloadManager();
+      }
+    }
+
+    
+    // TODO aggregate all those metrics accross the whole regionserver
+    // TODO extend those metrics to be per scanner level metrics whether preloading is on or off,
+    // include read times (in micro seconds) and more info about the scanner behavious in general
+    class ScanPreloadMetrics {
+      long lastRequestOffset;
+      long seekBacks;
+      long seekBackNonData;
+      long indexBlocksRead;
+      long inWrongPlaceCount;
+      long emptyQueue;
+      long ready;
+
+      public String toString() {
+        return "Ready = " + ready + ", Last requested offset = "
+            + lastRequestOffset + ", Seek back count = " + seekBacks
+            + ", Seek back for non data blocks = " + seekBackNonData
+            + ", Index blocks read = " + indexBlocksRead
+            + ", In a wrong place = " + inWrongPlaceCount
+            + ", Empty queue = " + emptyQueue
+            + ", Index blocks read by preloader " + indexBlocksRead;
+      }
+    }
+    
+    /*
+     * This is responsible for placing blocking preload requests, whenever a block is requested by
+     * scanner (other than those requested by BlockIndexReader) the request goes through this
+     * object, if the requested block is ready then it's read directly from block cache, otherwise
+     * we wait for ongoing requests to complete, request the block and wait for it to be ready
+     */
+    class BlockingPreloadManager {
+      // This controls the critical section where the preloader is preloading in
+      ReentrantLock lock;
+      // This queue contains those blocks for which preload was attempted.
+      LinkedBlockingQueue<Long> preloadAttempted;
+      // This queue is to keep blocks that should be evicted
+      LinkedBlockingQueue<Long> evictQueue;
+      //Offset of the next block to preload
+      long startOffset;
+      //size of the next block to preload
+      long startSize;
+      BlockType expectedType;
+      // Number of blocks left to preload
+      AtomicInteger leftToPreload;
+      // KeyValueContext for the preload requests
+      KeyValueContext preloaderKvContext;
+      // Last started task
+      BlockPreloader lastTask;
+      ScanPreloadMetrics metrics = new ScanPreloadMetrics();
+
+      public BlockingPreloadManager() {
+        lock = new ReentrantLock(true);
+        preloadAttempted = new LinkedBlockingQueue<Long>();
+        leftToPreload = new AtomicInteger(0);
+        preloaderKvContext = new KeyValueContext();
+        evictQueue = new LinkedBlockingQueue<Long>();
+        lastTask = new BlockPreloader(false);
+      }
+      
+      /*
+       * Preloader that reads one or more blocks into the block cache.
+       */
+      class BlockPreloader implements Runnable {
+        boolean run;
+
+        public BlockPreloader(boolean run) {
+          this.run = run;
+        }
+
+        @Override
+        public void run() {
+          while (leftToPreload.get() > 0 && run) {
+            lock.lock();
+            // double check in case we acquired the lock after being already stopped
+            if (!run) {
+              lock.unlock();
+              return;
+            }
+            HFileBlock block = null;
+            try {
+              block =
+                  reader.readBlock(startOffset, startSize, cacheBlocks,
+                    isCompaction, ON_PRELOAD, null, preloaderKvContext);
+            } catch (Throwable e) {
+              // in case of ANY kind of error, we'll mark this block as attempted and let the IPC
+              // Caller handler catch this exception
+              preloadAttempted.add(startOffset);
+              lock.unlock();
+              LOG.error("Exception occured while attempting preload", e);
+              return;
+            }
+            preloadAttempted.add(startOffset);
+            if (block == null) {
+              lock.unlock();
+              return;
+            }
+            if (block.getBlockType().isData()
+                && !preloaderKvContext.getObtainedFromCache()) {
+              evictQueue.add(startOffset);
+            } else if (!block.getBlockType().isData()) {
+              metrics.indexBlocksRead++;
+            }
+            // otherwise we preloaded this block successfully ready to move on next block
+            startOffset = block.getOffset() + block.getOnDiskSizeWithHeader();
+            startSize = block.getNextBlockOnDiskSizeWithHeader();
+            leftToPreload.decrementAndGet();
+            lock.unlock();
+            if (evictQueue.size() > scanPreloadBlocksKeptInCache) {
+              long offset = evictQueue.poll();
+              hfileReaderV2.evictBlock(offset, isCompaction);
+            }
+          }
+          run = false;
+        }
+      }
+
+      /**
+       * Takes offset, size and blocktype and returns the block requested here's how this function
+       * works <li>Check if the requested block is already preloaded and in the block cache, if so
+       * we just go on and read it and start a new preloader if required</li> <li>Otherwise we wait
+       * for preloader to finish it's current read (if any) and Check again, start a new preloader</li>
+       * <li>Otherwise we go on and read it by ourselves and start a new preloader</li>
+       * 
+       * @param offset offset of the requested block
+       * @param size size of the requested block
+       * @param blocktype expected block type of the requested block
+       * @return the requested HFileBlock
+       * @throws IOException
+       * @throws InterruptedException
+       */
+      public HFileBlock getPreloadBlock(long offset, long size,
+          BlockType blocktype) throws IOException, InterruptedException {
+        Long read = preloadAttempted.peek();
+        HFileBlock block;
+        if (read != null && read.equals(offset)) {
+          metrics.ready++;
+          // The block we need is already preloaded
+          preloadAttempted.poll();
+          if (lastTask.run) {
+            leftToPreload.incrementAndGet();
+          } else {
+            leftToPreload.set(scanPreloadBlocksCount - preloadAttempted.size());
+            startNewPreloader();
+          }
+          block =
+              reader.readBlock(offset, size, cacheBlocks, isCompaction, false,
+                blocktype, kvContext);
+        } else {
+          // wait for preloader to finish the current block being read
+          lock.lock();
+          // This will make sure that this preloader will never run.
+          lastTask.run = false;
+          // Unlock the lock for future tasks
+          lock.unlock();
+          // see if we already preloaded
+          read = preloadAttempted.peek();
+          if (read != null && read.equals(offset)) {
+            preloadAttempted.poll();
+            // continue wherever you stopped you were in the right direction
+            block =
+                reader.readBlock(offset, size, cacheBlocks, isCompaction,
+                  false, blocktype, kvContext);
+            leftToPreload.set(scanPreloadBlocksCount - preloadAttempted.size());
+            startNewPreloader();
+          } else {
+            metrics.inWrongPlaceCount++;
+            if (read == null) {
+              metrics.emptyQueue++;
+            }
+            // you're in the wrong place read the block, clear and reset offset
+            block =
+                reader.readBlock(offset, size, cacheBlocks, isCompaction,
+                  false, blocktype, kvContext);
+            preloadAttempted.clear();
+            if (block == null) {
+              return null;
+            }
+            startOffset = block.getOffset() + block.getOnDiskSizeWithHeader();
+            startSize = block.getNextBlockOnDiskSizeWithHeader();
+            expectedType = blocktype;
+            leftToPreload.set(scanPreloadBlocksCount);
+            startNewPreloader();
+          }
+        }
+        if (offset < metrics.lastRequestOffset) {
+          metrics.seekBacks++;
+          if (block != null && !block.getBlockType().isData()) {
+            metrics.seekBackNonData++;
+          }
+        }
+        metrics.lastRequestOffset = offset;
+        return block;
+      }
+
+      public boolean startNewPreloader() {
+        if (PreloadThreadPool.getThreadPool().runTask(
+          lastTask = new BlockPreloader(true)) != null) {
+          return true;
+        }
+        return false;
+      }
+
+      public void close() {
+        lock.lock();
+        lastTask.run = false;
+        lock.unlock();
+        while (!evictQueue.isEmpty()) {
+          hfileReaderV2.evictBlock(evictQueue.poll(), isCompaction);
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Preloader metrics " + metrics);
+        }
+      }
+    }
+
+    /**
+     * reads the requested block using the blocking preload manager.
+     * 
+     * @param offset offset of the target block.
+     * @param size size of the target block
+     * @param blocktype expected type of the target block
+     * @return  the requested HFileBlock
+     * @throws IOException if any errors occured during read block
+     */
+    protected HFileBlock readBlockUsingBlockingPreloadManager(long offset,
+        long size, BlockType blocktype) throws IOException {
+      try {
+        return blockManager.getPreloadBlock(offset, size, blocktype);
+      } catch (InterruptedException e) {
+        return reader.readBlock(offset, size, cacheBlocks, isCompaction, false,
+          blocktype, kvContext);
+      }
+    }
+
+    /**
+     * Closes current scanner by canceling all on going tasks
+     */
+    @Override
+    public void close() {
+      LOG.info("Closing HFileScanner for file" + hfileReaderV2.getName());
+      if (preloadBlocks) {
+        blockManager.close();
+      }
     }
 
     /**
@@ -594,15 +880,17 @@ public class HFileReaderV2 extends Abstr
         throws IOException {
       HFileBlockIndex.BlockIndexReader indexReader =
           reader.getDataBlockIndexReader();
-      BlockWithScanInfo blockWithScanInfo = 
-        indexReader.loadDataBlockWithScanInfo(key, offset, length, block, 
+      BlockWithScanInfo blockWithScanInfo =
+          indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
             cacheBlocks, isCompaction, this.kvContext);
-      if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
+      if (blockWithScanInfo == null
+          || blockWithScanInfo.getHFileBlock() == null) {
         // This happens if the key e.g. falls before the beginning of the file.
         return -1;
       }
       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
-          blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
+        blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length,
+        false);
     }
 
     protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
@@ -652,7 +940,8 @@ public class HFileReaderV2 extends Abstr
         throws IOException {
       HFileBlock seekToBlock =
           reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
-              block, cacheBlocks, isCompaction, this.kvContext);
+            block, cacheBlocks || preloadBlocks, isCompaction,
+            this.kvContext);
       if (seekToBlock == null) {
         return false;
       }
@@ -671,11 +960,18 @@ public class HFileReaderV2 extends Abstr
         // It is important that we compute and pass onDiskSize to the block
         // reader so that it does not have to read the header separately to
         // figure out the size.
-        seekToBlock = reader.readBlock(previousBlockOffset,
-            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
-            isCompaction, BlockType.DATA, this.kvContext);
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
+        if (preloadBlocks) {
+          seekToBlock =
+              readBlockUsingBlockingPreloadManager(previousBlockOffset,
+                seekToBlock.getOffset() - previousBlockOffset, BlockType.DATA);
+        } else {
+          seekToBlock =
+              reader.readBlock(previousBlockOffset, seekToBlock.getOffset()
+                  - previousBlockOffset, cacheBlocks, isCompaction, false,
+                BlockType.DATA, this.kvContext);
+          // TODO shortcut: seek forward in this block to the last key of the
+          // block.
+        }
       }
       byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
       loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
@@ -707,13 +1003,20 @@ public class HFileReaderV2 extends Abstr
 
         // We are reading the next block without block type validation, because
         // it might turn out to be a non-data block.
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks,
-            isCompaction, null, this.kvContext);
+        if (preloadBlocks) {
+          curBlock =
+              readBlockUsingBlockingPreloadManager(curBlock.getOffset()
+                  + curBlock.getOnDiskSizeWithHeader(),
+                curBlock.getNextBlockOnDiskSizeWithHeader(), null);
+        } else {
+          curBlock =
+              reader.readBlock(
+                curBlock.getOffset() + curBlock.getOnDiskSizeWithHeader(),
+                curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks,
+                isCompaction, false, null, this.kvContext);
+        }
       } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
           curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
-
       return curBlock;
     }
   }
@@ -723,13 +1026,13 @@ public class HFileReaderV2 extends Abstr
    */
   protected static class ScannerV2 extends AbstractScannerV2 {
     private HFileReaderV2 reader;
-
+    
     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
-        final boolean isCompaction) {
-      super(r, cacheBlocks, isCompaction);
+        final boolean isCompaction, boolean preloadBlocks) {
+      super(r, cacheBlocks, isCompaction, preloadBlocks);
       this.reader = r;
     }
-
+    
     @Override
     public KeyValue getKeyValue() {
       if (!isSeeked())
@@ -769,7 +1072,7 @@ public class HFileReaderV2 extends Abstr
       currMemstoreTS = 0;
       currMemstoreTSLen = 0;
     }
-
+    
     /**
      * Go to the next key/value in the block section. Loads the next block if
      * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
@@ -807,11 +1110,11 @@ public class HFileReaderV2 extends Abstr
           setNonSeekedState();
           return false;
         }
-
+        
         updateCurrBlock(nextBlock);
         return true;
       }
-
+      
       // We are still in the same block.
       readKeyValueLen();
       return true;
@@ -844,8 +1147,7 @@ public class HFileReaderV2 extends Abstr
       }
 
       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks,
-          isCompaction, BlockType.DATA, this.kvContext);
-
+          isCompaction, false, BlockType.DATA, this.kvContext);
       if (block.getOffset() < 0) {
         throw new IOException("Invalid block offset: " + block.getOffset());
       }
@@ -1047,8 +1349,8 @@ public class HFileReaderV2 extends Abstr
     private final boolean includesMemstoreTS;
 
     public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
-        boolean isCompaction, boolean includesMemstoreTS) {
-      super(reader, cacheBlocks, isCompaction);
+        boolean isCompaction, boolean includesMemstoreTS, boolean preloadBlocks) {
+      super(reader, cacheBlocks, isCompaction, preloadBlocks);
       this.includesMemstoreTS = includesMemstoreTS;
     }
 
@@ -1115,7 +1417,7 @@ public class HFileReaderV2 extends Abstr
       }
 
       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks,
-          isCompaction, BlockType.DATA, this.kvContext);
+          isCompaction, false, BlockType.DATA, this.kvContext);
       if (block.getOffset() < 0) {
         throw new IOException("Invalid block offset: " + block.getOffset());
       }
@@ -1239,5 +1541,13 @@ public class HFileReaderV2 extends Abstr
   public boolean isFileInfoLoaded() {
     return true; // We load file info in constructor in version 2.
   }
-
+  
+  public void evictBlock(long offset, boolean isCompaction) {
+    BlockCacheKey cacheKey =
+        new BlockCacheKey(name, offset,
+            dataBlockEncoder.getEffectiveEncodingInCache(isCompaction), null);
+    if (cacheConf.isBlockCacheEnabled()) {
+      cacheConf.getBlockCache().evictBlock(cacheKey);
+    }
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java Thu Sep 19 18:18:17 2013
@@ -145,4 +145,9 @@ public interface HFileScanner {
   public boolean isSeeked();
 
   public boolean currKeyValueObtainedFromCache();
+ 
+  /**
+   * closes the scanner
+   */
+  public void close();
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Thu Sep 19 18:18:17 2013
@@ -123,7 +123,7 @@ public class HFileWriterV2 extends Abstr
 
     LOG.debug("HFileWriter initialized with " + cacheConf);
   }
-
+  
   /**
    * At a block boundary, write all the inline blocks and opens new block.
    *
@@ -132,7 +132,7 @@ public class HFileWriterV2 extends Abstr
   private void checkBlockBoundary() throws IOException {
     if (fsBlockWriter.blockSizeWritten() < blockSize)
       return;
-
+    
     finishBlock();
     writeInlineBlocks(false);
     newBlock();
@@ -329,7 +329,7 @@ public class HFileWriterV2 extends Abstr
   public void append(final byte[] key, final byte[] value) throws IOException {
     append(0, key, 0, key.length, value, 0, value.length, null);
   }
-
+  
   /**
    * Add key/value to file. Keys must be added in an order that agrees with the
    * Comparator passed on construction.

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/PreloadThreadPool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/PreloadThreadPool.java?rev=1524800&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/PreloadThreadPool.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/PreloadThreadPool.java Thu Sep 19 18:18:17 2013
@@ -0,0 +1,48 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+public class PreloadThreadPool {
+  public static final Log LOG = LogFactory.getLog(PreloadThreadPool.class);
+
+  private static PreloadThreadPool pool;
+
+  private ThreadPoolExecutor preloadThreadsPool;
+
+  private PreloadThreadPool(int minimum, int maximum) {
+    // A new thread pool with the following policy
+    // If a task is submitted and number of threads <= maximum
+    preloadThreadsPool =
+        new ThreadPoolExecutor(minimum, maximum, 360, TimeUnit.SECONDS,
+            new SynchronousQueue<Runnable>(), new DaemonThreadFactory("scan-preloader-"),
+            new ThreadPoolExecutor.AbortPolicy());
+    preloadThreadsPool.allowCoreThreadTimeOut(true);
+  }
+
+  public static PreloadThreadPool getThreadPool() {
+    return pool;
+  }
+
+  public synchronized static void constructPreloaderThreadPool(int min, int max) {
+    if (pool == null) {
+      pool = new PreloadThreadPool(min, max);
+    }
+  }
+
+  public Future runTask(Runnable task) {
+    try {
+      return preloadThreadsPool.submit(task);
+    } catch (RejectedExecutionException e) {
+      LOG.debug("Execution of preloader refused");
+    }
+    return null;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Sep 19 18:18:17 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -324,7 +325,7 @@ public class LoadIncrementalHFiles exten
               .withBloomType(bloomFilterType)
               .withBloomErrorRate(err)
               .build();
-      HFileScanner scanner = halfReader.getScanner(false, false);
+      HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
         KeyValue kv = scanner.getKeyValue();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Sep 19 18:18:17 2013
@@ -181,7 +181,7 @@ public class HRegion implements HeapSize
   // The number of rows are read
   protected final AtomicInteger rowReadCnt = new AtomicInteger(0);
   
-  // The numbe of rows are updated
+  // The number of rows are updated
   protected final AtomicInteger rowUpdateCnt = new AtomicInteger(0);
 
   private HRegionServer regionServer = null;
@@ -1749,9 +1749,8 @@ public class HRegion implements HeapSize
    * @return InternalScanner
    * @throws IOException read exceptions
    */
-  public InternalScanner getScanner(Scan scan)
-  throws IOException {
-   return getScanner(scan, null);
+  public InternalScanner getScanner(Scan scan) throws IOException {
+    return getScanner(scan, null);
   }
 
   protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Sep 19 18:18:17 2013
@@ -113,6 +113,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.L2BucketCache;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
@@ -477,7 +478,13 @@ public class HRegionServer implements HR
                 return t;
               }
             });
-
+    // Construct threads for preloading
+    int corePreloadThreads =
+        conf.getInt(HConstants.CORE_PRELOAD_THREAD_COUNT,
+          HConstants.DEFAULT_CORE_PRELOAD_THREAD_COUNT);
+    int maxPreloadThreads = 
+        conf.getInt(HConstants.MAX_PRELOAD_THREAD_COUNT, HConstants.DEFAULT_MAX_PRELOAD_THREAD_COUNT);
+    PreloadThreadPool.constructPreloaderThreadPool(corePreloadThreads, maxPreloadThreads);
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Thu Sep 19 18:18:17 2013
@@ -480,8 +480,6 @@ public class RegionScanner implements In
     return next(result, limit, null, kvContext);
   }
 
-  
-
   @Override
   public boolean currKeyValueObtainedFromCache() {
     return this.storeHeap.currKeyValueObtainedFromCache();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Sep 19 18:18:17 2013
@@ -942,10 +942,15 @@ public class Store extends SchemaConfigu
   /**
    * Get all scanners with no filtering based on TTL (that happens further down
    * the line).
+   * @param cacheBlocks whether those scanners should cache the blocks into the block cache or not.
+   * @param isCompaction whether those scanners is being used for compaction or not.
+   * @param preloadDataBlocks whether those scanners should preload the next data blocks into the block cache or not.
    * @return all scanners for this store
    */
-  protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
-      boolean isCompaction, ScanQueryMatcher matcher) throws IOException {
+  protected List<KeyValueScanner>
+      getScanners(boolean cacheBlocks, boolean isCompaction,
+          boolean preloadBlocks, ScanQueryMatcher matcher)
+          throws IOException {
     List<StoreFile> storeFiles;
     List<KeyValueScanner> memStoreScanners;
     this.lock.readLock().lock();
@@ -963,7 +968,7 @@ public class Store extends SchemaConfigu
     // actually more correct, since memstore get put at the end.
     List<StoreFileScanner> sfScanners =
         StoreFileScanner.getScannersForStoreFiles(storeFiles, cacheBlocks,
-            isCompaction, matcher);
+          isCompaction, preloadBlocks, matcher);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
     scanners.addAll(sfScanners);
@@ -1588,7 +1593,7 @@ public class Store extends SchemaConfigu
       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
     }
     // Get a scanner that caches blocks and that uses pread.
-    HFileScanner scanner = r.getScanner(true, false);
+    HFileScanner scanner = r.getScanner(true, false, false);
     // Seek scanner.  If can't seek it, return.
     if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Sep 19 18:18:17 2013
@@ -1297,7 +1297,7 @@ public class StoreFile extends SchemaCon
      * @return a scanner
      */
     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks) {
-      return getStoreFileScanner(cacheBlocks, false);
+      return getStoreFileScanner(cacheBlocks, false, false);
     }
 
     /**
@@ -1305,12 +1305,14 @@ public class StoreFile extends SchemaCon
      *
      * @param cacheBlocks should this scanner cache blocks?
      * @param isCompaction is scanner being used for compaction?
+     * @param preloadBlocksMode this tells whether the scanner uses Blocking/NonBlocking preloader
+     *          or not using preloading at all
      * @return a scanner
      */
     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
-                                               boolean isCompaction) {
-      return new StoreFileScanner(this,
-                                 getScanner(cacheBlocks, isCompaction), !isCompaction);
+        boolean isCompaction, boolean preloadBlocks) {
+      return new StoreFileScanner(this, getScanner(cacheBlocks, isCompaction,
+        preloadBlocks), !isCompaction);
     }
 
     /**
@@ -1323,21 +1325,23 @@ public class StoreFile extends SchemaCon
      */
     @Deprecated
     public HFileScanner getScanner(boolean cacheBlocks){
-      return getScanner(cacheBlocks, false);
+      return getScanner(cacheBlocks, false, false);
     }
 
     /**
-     * Warning: Do not write further code which depends on this call. Instead
-     * use getStoreFileScanner() which uses the StoreFileScanner class/interface
-     * which is the preferred way to scan a store with higher level concepts.
-     *
+     * Warning: Do not write further code which depends on this call. Instead use
+     * getStoreFileScanner() which uses the StoreFileScanner class/interface which is the preferred
+     * way to scan a store with higher level concepts.
      * @param cacheBlocks should we cache the blocks?
      * @param isCompaction is scanner being used for compaction?
+     * @param preloadBlocksMode this tells whether the scanner uses Blocking/NonBlocking preloader
+     *          or not using preloading at all
      * @return the underlying HFileScanner
      */
     @Deprecated
-    public HFileScanner getScanner(boolean cacheBlocks, boolean isCompaction) {
-      return reader.getScanner(cacheBlocks, isCompaction);
+    public HFileScanner getScanner(boolean cacheBlocks, boolean isCompaction,
+        boolean preloadBlocks) {
+      return reader.getScanner(cacheBlocks, isCompaction, preloadBlocks);
     }
 
     public void close(boolean evictOnClose) throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Thu Sep 19 18:18:17 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -49,7 +50,7 @@ public class StoreFileScanner implements
   private boolean realSeekDone;
   private boolean delayedReseek;
   private KeyValue delayedSeekKV;
-
+  
   private boolean enforceMVCC = false;
 
   // The variable, realSeekDone, may cheat on store file scanner for the
@@ -87,10 +88,10 @@ public class StoreFileScanner implements
    * Return an array of scanners corresponding to the given set of store files.
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(
-      Collection<StoreFile> files, boolean cacheBlocks,
-      boolean isCompaction) throws IOException {
+      Collection<StoreFile> files, boolean cacheBlocks, boolean isCompaction)
+      throws IOException {
     return getScannersForStoreFiles(files, cacheBlocks, isCompaction,
-        null);
+      false, null);
   }
 
   /**
@@ -99,13 +100,14 @@ public class StoreFileScanner implements
    * optimization
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(
-      Collection<StoreFile> files, boolean cacheBlocks,
-      boolean isCompaction, ScanQueryMatcher matcher) throws IOException {
+      Collection<StoreFile> files, boolean cacheBlocks, boolean isCompaction,
+      boolean preloadBlocks, ScanQueryMatcher matcher) throws IOException {
     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
         files.size());
     for (StoreFile file : files) {
       StoreFile.Reader r = file.createReader();
-      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, isCompaction);
+      StoreFileScanner scanner =
+          r.getStoreFileScanner(cacheBlocks, isCompaction, preloadBlocks);
       scanner.setScanQueryMatcher(matcher);
       scanners.add(scanner);
     }
@@ -209,7 +211,7 @@ public class StoreFileScanner implements
   }
 
   public void close() {
-    // Nothing to close on HFileScanner?
+    hfs.close();
     cur = null;
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Sep 19 18:18:17 2013
@@ -270,7 +270,8 @@ public class StoreScanner extends NonLaz
    */
   private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
     final boolean isCompaction = false;
-    return selectScannersFrom(store.getScanners(cacheBlocks, isCompaction, matcher));
+    return selectScannersFrom(store.getScanners(cacheBlocks, isCompaction,
+      scan.isPreloadBlocks(), matcher));
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java Thu Sep 19 18:18:17 2013
@@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCacheFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -118,7 +117,11 @@ public class SchemaMetrics {
     READ_COUNT("BlockReadCnt", COMPACTION_AWARE_METRIC_FLAG),
     CACHE_HIT("BlockReadCacheHitCnt", COMPACTION_AWARE_METRIC_FLAG),
     CACHE_MISS("BlockReadCacheMissCnt", COMPACTION_AWARE_METRIC_FLAG),
-
+  
+    PRELOAD_CACHE_HIT("PreloadCacheHitCnt",COMPACTION_AWARE_METRIC_FLAG),
+    PRELOAD_CACHE_MISS("PreloadCacheMissCnt",COMPACTION_AWARE_METRIC_FLAG),
+    PRELOAD_READ_TIME("PreloadReadTime",COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG),
+    
     CACHE_SIZE("blockCacheSize", PERSISTENT_METRIC_FLAG),
     UNENCODED_CACHE_SIZE("blockCacheUnencodedSize", PERSISTENT_METRIC_FLAG),
     CACHE_NUM_BLOCKS("cacheNumBlocks", PERSISTENT_METRIC_FLAG),
@@ -394,6 +397,18 @@ public class SchemaMetrics {
     return i;
   }
 
+  public String getBlockMetric(BlockMetricType type, BlockCategory category,
+      boolean isCompaction) {
+    return HRegion.getNumericMetric(getBlockMetricName(category, isCompaction,
+      type)) + "";
+  }
+
+  public String getTimeVaryingBlockMetric(BlockMetricType type,
+      BlockCategory category, boolean isCompaction) {
+    return HRegion.getTimeVaryingMetric(getBlockMetricName(category,
+      isCompaction, type)) + "";
+  }
+
   public String getBlockMetricName(BlockCategory blockCategory,
       boolean isCompaction, BlockMetricType metricType) {
     if (isCompaction && !metricType.compactionAware()) {
@@ -436,7 +451,7 @@ public class SchemaMetrics {
       addToReadTime(BlockCategory.ALL_CATEGORIES, isCompaction, timeMs);
     }
   }
-
+  
   /**
    * Used to accumulate store metrics across multiple regions in a region
    * server.  These metrics are not "persistent", i.e. we keep overriding them
@@ -488,10 +503,35 @@ public class SchemaMetrics {
     HRegion.incrNumericPersistentMetric(
         storeMetricNames[storeMetricType.ordinal()], value);
   }
+  
+  /**
+   * Updates metrics for cacheHit/cacheMiss when a block is read.
+   * @param blockCategory category of the block read
+   * @param isCompaction whether this is compaction read or not
+   * @param timeMs time taken to read the block
+   * @param preload whether this a preloaded block or not
+   * @param obtainedFromCache whether the block is found in cache or not
+   */
+  public void updateOnBlockRead(BlockCategory blockCategory,
+      boolean isCompaction, long timeMs, boolean preload,
+      boolean obtainedFromCache) {
+    if (obtainedFromCache) {
+      if (!preload) {
+        updateOnCacheHit(blockCategory, isCompaction, timeMs);
+      } else {
+        updateOnPreloadCacheHit(blockCategory, isCompaction, timeMs);
+      }
+    } else {
+      if (!preload) {
+        updateOnCacheMiss(blockCategory, isCompaction, timeMs);
+      } else {
+        updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs);
+      }
+    }
+  }
 
   /**
-   * Updates the number of hits and the total number of block reads on a block
-   * cache hit.
+   * Updates the number of hits and the total number of block reads on a block cache hit.
    */
   public void updateOnCacheHit(BlockCategory blockCategory,
       boolean isCompaction) {
@@ -502,6 +542,20 @@ public class SchemaMetrics {
       ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction);
     }
   }
+  /**
+   * Updates the number of hits and the total number of block reads on a block
+   * cache hit.
+   */
+  public void updateOnCacheHit(BlockCategory blockCategory,
+      boolean isCompaction, long deltaMs) {
+    blockCategory.expectSpecific();
+    incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT);
+    incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT);
+    addToReadTime(blockCategory, isCompaction, deltaMs);
+    if (this != ALL_SCHEMA_METRICS) {
+      ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction, deltaMs);
+    }
+  }
 
   /**
    * Updates read time, the number of misses, and the total number of block
@@ -518,8 +572,47 @@ public class SchemaMetrics {
           timeMs);
     }
   }
+  
+  private void addToPreloadReadTime(BlockCategory blockCategory,
+      boolean isCompaction, long timeMs) {
+    HRegion.incrTimeVaryingMetric(getBlockMetricName(blockCategory,
+        isCompaction, BlockMetricType.PRELOAD_READ_TIME), timeMs);
+
+    // Also update the read time aggregated across all block categories
+    if (blockCategory != BlockCategory.ALL_CATEGORIES) {
+      addToPreloadReadTime(BlockCategory.ALL_CATEGORIES, isCompaction, timeMs);
+    }
+  }
+  
+  /**
+   * Updates read time, the number of misses, and the total number of block for preloader
+   */
+  public void updateOnPreloadCacheMiss(BlockCategory blockCategory, boolean isCompaction,
+      long timeMs) {
+    blockCategory.expectSpecific();
+    addToPreloadReadTime(blockCategory, isCompaction, timeMs);
+    incrNumericMetric(blockCategory, isCompaction, BlockMetricType.PRELOAD_CACHE_MISS);
+    if (this != ALL_SCHEMA_METRICS) {
+      ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs);
+    }
+  }
 
   /**
+   * Updates read time, the number of hits, and the total number of block for preloader
+   */
+  public void updateOnPreloadCacheHit(BlockCategory blockCategory,
+      boolean isCompaction, long timeMs) {
+    blockCategory.expectSpecific();
+    addToPreloadReadTime(blockCategory, isCompaction, timeMs);
+    incrNumericMetric(blockCategory, isCompaction,
+      BlockMetricType.PRELOAD_CACHE_HIT);
+    if (this != ALL_SCHEMA_METRICS) {
+      ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction,
+        timeMs);
+    }
+  }
+  
+  /**
    * Adds the given delta to the cache size for the given block category and
    * the aggregate metric for all block categories. Updates both the per-CF
    * counter and the counter for all CFs (four metrics total). The cache size

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java Thu Sep 19 18:18:17 2013
@@ -96,7 +96,7 @@ public class CompoundBloomFilter extends
       try {
         // We cache the block and use a positional read.
         bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
-            index.getRootBlockDataSize(block), true, false,
+            index.getRootBlockDataSize(block), true, false, false,
             BlockType.BLOOM_CHUNK, null);
       } catch (IOException ex) {
         // The Bloom filter is broken, turn it off.

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java?rev=1524800&r1=1524799&r2=1524800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java Thu Sep 19 18:18:17 2013
@@ -252,7 +252,7 @@ public class HFilePerformanceEvaluation 
     @Override
     void setUp() throws Exception {
       super.setUp();
-      this.scanner = this.reader.getScanner(false, false);
+      this.scanner = this.reader.getScanner(false, false, false);
       this.scanner.seekTo();
     }
 
@@ -284,7 +284,7 @@ public class HFilePerformanceEvaluation 
 
     @Override
     void doRow(int i) throws Exception {
-      HFileScanner scanner = this.reader.getScanner(false, true);
+      HFileScanner scanner = this.reader.getScanner(false, true, false);
       byte [] b = getRandomRow();
       scanner.seekTo(b);
       ByteBuffer k = scanner.getKey();
@@ -308,7 +308,7 @@ public class HFilePerformanceEvaluation 
 
     @Override
     void doRow(int i) throws Exception {
-      HFileScanner scanner = this.reader.getScanner(false, false);
+      HFileScanner scanner = this.reader.getScanner(false, false, false);
       byte [] b = getRandomRow();
       if (scanner.seekTo(b) != 0) {
         System.out.println("Nonexistent row: " + new String(b));
@@ -342,7 +342,7 @@ public class HFilePerformanceEvaluation 
 
     @Override
     void doRow(int i) throws Exception {
-      HFileScanner scanner = this.reader.getScanner(false, true);
+      HFileScanner scanner = this.reader.getScanner(false, true, false);
       scanner.seekTo(getGaussianRandomRowBytes());
       for (int ii = 0; ii < 30; ii++) {
         if (!scanner.next()) {



Mime
View raw message