hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject [3/3] hbase git commit: HBASE-12295 Prevent block eviction under us if reads are in progress from the BBs (Ram)
Date Tue, 21 Jul 2015 15:46:32 GMT
HBASE-12295 Prevent block eviction under us if reads are in progress from
the BBs (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccb22bd8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccb22bd8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccb22bd8

Branch: refs/heads/master
Commit: ccb22bd80dfae64ff27f660254afb224dce268f0
Parents: 3b6db26
Author: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Authored: Tue Jul 21 21:15:32 2015 +0530
Committer: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Committed: Tue Jul 21 21:15:32 2015 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ShareableMemory.java    |   36 +
 .../io/encoding/BufferedDataBlockEncoder.java   |    2 +-
 .../hadoop/hbase/io/hfile/HFileContext.java     |   28 +-
 .../hbase/io/hfile/HFileContextBuilder.java     |    9 +-
 .../hadoop/hbase/io/hfile/BlockCache.java       |   13 +
 .../apache/hadoop/hbase/io/hfile/Cacheable.java |   15 +
 .../hbase/io/hfile/CacheableDeserializer.java   |    5 +-
 .../hbase/io/hfile/CombinedBlockCache.java      |   14 +
 .../hbase/io/hfile/CompoundBloomFilter.java     |   14 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |    9 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |   34 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java  |  153 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  315 ++--
 .../hadoop/hbase/io/hfile/LruBlockCache.java    |    9 +-
 .../hbase/io/hfile/MemcachedBlockCache.java     |    9 +-
 .../hbase/io/hfile/bucket/BucketCache.java      |  119 +-
 .../io/hfile/bucket/ByteBufferIOEngine.java     |   37 +-
 .../hbase/io/hfile/bucket/FileIOEngine.java     |   27 +-
 .../hadoop/hbase/io/hfile/bucket/IOEngine.java  |   21 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  123 +-
 .../hadoop/hbase/regionserver/KeyValueHeap.java |    7 +-
 .../hbase/regionserver/RSRpcServices.java       |  124 +-
 .../regionserver/ReversedRegionScannerImpl.java |    4 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |   11 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |    9 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |   28 +-
 .../client/TestBlockEvictionFromClient.java     | 1441 ++++++++++++++++++
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |    7 +-
 .../hadoop/hbase/io/hfile/TestCacheConfig.java  |    8 +-
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |   29 +
 .../hbase/io/hfile/TestCachedBlockQueue.java    |    5 +
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |    2 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   |    6 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |    4 +
 .../hbase/io/hfile/TestLruBlockCache.java       |    5 +
 .../io/hfile/bucket/TestByteBufferIOEngine.java |   13 +-
 .../hbase/io/hfile/bucket/TestFileIOEngine.java |    7 +-
 .../regionserver/TestHeapMemoryManager.java     |   11 +-
 .../TestScannerHeartbeatMessages.java           |   16 +-
 39 files changed, 2377 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java
new file mode 100644
index 0000000..f8b9127
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A cell implementing this interface would mean that the memory area backing this cell will refer
+ * to a memory area that could be part of a larger common memory area used by the
+ * RegionServer. If an exclusive instance is required, use the {@link #cloneToCell()} to have the
+ * contents of the cell copied to an exclusive memory area.
+ */
+@InterfaceAudience.Private
+public interface ShareableMemory {
+  /**
+   * Does a deep copy of the contents to a new memory area and
+   * returns it in the form of a cell.
+   * @return Cell the deep cloned cell
+   */
+  public Cell cloneToCell();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 1a65223..a758b26 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -283,7 +283,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
   /**
    * Copies only the key part of the keybuffer by doing a deep copy and passes the
    * seeker state members for taking a clone.
-   * Note that the value byte[] part is still pointing to the currentBuffer and the
+   * Note that the value byte[] part is still pointing to the currentBuffer and
    * represented by the valueOffset and valueLength
    */
   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index 5edd47d..9945146 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -56,6 +56,7 @@ public class HFileContext implements HeapSize, Cloneable {
   /** Encryption algorithm and key used */
   private Encryption.Context cryptoContext = Encryption.Context.NONE;
   private long fileCreateTime;
+  private String hfileName;
 
   //Empty constructor.  Go with setters
   public HFileContext() {
@@ -77,12 +78,13 @@ public class HFileContext implements HeapSize, Cloneable {
     this.encoding = context.encoding;
     this.cryptoContext = context.cryptoContext;
     this.fileCreateTime = context.fileCreateTime;
+    this.hfileName = context.hfileName;
   }
 
-  public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags,
+  HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags,
       Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType,
       int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
-      Encryption.Context cryptoContext, long fileCreateTime) {
+      Encryption.Context cryptoContext, long fileCreateTime, String hfileName) {
     this.usesHBaseChecksum = useHBaseChecksum;
     this.includesMvcc =  includesMvcc;
     this.includesTags = includesTags;
@@ -96,6 +98,7 @@ public class HFileContext implements HeapSize, Cloneable {
     }
     this.cryptoContext = cryptoContext;
     this.fileCreateTime = fileCreateTime;
+    this.hfileName = hfileName;
   }
 
   /**
@@ -119,10 +122,6 @@ public class HFileContext implements HeapSize, Cloneable {
     return compressAlgo;
   }
 
-  public void setCompression(Compression.Algorithm compressAlgo) {
-    this.compressAlgo = compressAlgo;
-  }
-
   public boolean isUseHBaseChecksum() {
     return usesHBaseChecksum;
   }
@@ -175,10 +174,6 @@ public class HFileContext implements HeapSize, Cloneable {
     return encoding;
   }
 
-  public void setDataBlockEncoding(DataBlockEncoding encoding) {
-    this.encoding = encoding;
-  }
-
   public Encryption.Context getEncryptionContext() {
     return cryptoContext;
   }
@@ -187,6 +182,10 @@ public class HFileContext implements HeapSize, Cloneable {
     this.cryptoContext = cryptoContext;
   }
 
+  public String getHFileName() {
+    return this.hfileName;
+  }
+
   /**
    * HeapSize implementation
    * NOTE : The heapsize should be altered as and when new state variable are added
@@ -196,11 +195,14 @@ public class HFileContext implements HeapSize, Cloneable {
   public long heapSize() {
     long size = ClassSize.align(ClassSize.OBJECT +
         // Algorithm reference, encodingon, checksumtype, Encryption.Context reference
-        4 * ClassSize.REFERENCE +
+        5 * ClassSize.REFERENCE +
         2 * Bytes.SIZEOF_INT +
         // usesHBaseChecksum, includesMvcc, includesTags and compressTags
         4 * Bytes.SIZEOF_BOOLEAN +
         Bytes.SIZEOF_LONG);
+    if (this.hfileName != null) {
+      size += ClassSize.STRING + this.hfileName.length();
+    }
     return size;
   }
 
@@ -227,6 +229,10 @@ public class HFileContext implements HeapSize, Cloneable {
     sb.append(" compressAlgo=");      sb.append(compressAlgo);
     sb.append(" compressTags=");      sb.append(compressTags);
     sb.append(" cryptoContext=[ ");   sb.append(cryptoContext);      sb.append(" ]");
+    if (hfileName != null) {
+      sb.append(" name=");
+      sb.append(hfileName);
+    }
     sb.append(" ]");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
index a903974..ce3541f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
@@ -53,6 +53,8 @@ public class HFileContextBuilder {
   private Encryption.Context cryptoContext = Encryption.Context.NONE;
   private long fileCreateTime = 0;
 
+  private String hfileName = null;
+
   public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) {
     this.usesHBaseChecksum = useHBaseCheckSum;
     return this;
@@ -108,9 +110,14 @@ public class HFileContextBuilder {
     return this;
   }
 
+  public HFileContextBuilder withHFileName(String name) {
+    this.hfileName = name;
+    return this;
+  }
+
   public HFileContext build() {
     return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression,
         compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext,
-        fileCreateTime);
+        fileCreateTime, hfileName);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 57c4be9..cef7e02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.util.Iterator;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 
 /**
  * Block cache interface. Anything that implements the {@link Cacheable}
@@ -116,4 +117,16 @@ public interface BlockCache extends Iterable<CachedBlock> {
    * @return The list of sub blockcaches that make up this one; returns null if no sub caches.
    */
   BlockCache [] getBlockCaches();
+
+  /**
+   * Called when the scanner using the block decides to return the block once its usage
+   * is over.
+   * This API should be called after the block is used, failing to do so may have adverse effects
+   * by preventing the blocks from being evicted because of which it will prevent new hot blocks
+   * from getting added to the block cache.  The implementation of the BlockCache will decide
+   * on what to be done with the block based on the memory type of the block's {@link MemoryType}.
+   * @param cacheKey the cache key of the block
+   * @param block the hfileblock to be returned
+   */
+  void returnBlock(BlockCacheKey cacheKey, Cacheable block);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
index f611c61..3c7a19e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
@@ -60,4 +60,19 @@ public interface Cacheable extends HeapSize {
    * @return the block type of this cached HFile block
    */
   BlockType getBlockType();
+
+  /**
+   * @return the {@code MemoryType} of this Cacheable
+   */
+  MemoryType getMemoryType();
+
+  /**
+   * SHARED means when this Cacheable is read back from cache it refers to the same memory area as
+   * used by the cache for caching it.
+   * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
+   * exclusive memory area of this Cacheable.
+   */
+  public static enum MemoryType {
+    SHARED, EXCLUSIVE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
index 26555ef..a385fd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
@@ -36,14 +37,14 @@ public interface CacheableDeserializer<T extends Cacheable> {
   T deserialize(ByteBuff b) throws IOException;
 
   /**
-   * 
    * @param b
    * @param reuse true if Cacheable object can use the given buffer as its
    *          content
+   * @param memType the {@link MemoryType} of the buffer
    * @return T the deserialized object.
    * @throws IOException
    */
-  T deserialize(ByteBuff b, boolean reuse) throws IOException;
+  T deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException;
 
   /**
    * Get the identifier of this deserialiser. Identifier is unique for each

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 7725cf9..33b0d98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * CombinedBlockCache is an abstraction layer that combines
@@ -219,4 +221,16 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
   public void setMaxSize(long size) {
     this.lruCache.setMaxSize(size);
   }
+
+  @Override
+  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
+    // A noop
+    this.lruCache.returnBlock(cacheKey, block);
+    this.l2Cache.returnBlock(cacheKey, block);
+  }
+
+  @VisibleForTesting
+  public int getRefCount(BlockCacheKey cacheKey) {
+    return ((BucketCache) this.l2Cache).getRefCount(cacheKey);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
index 6890884..9d39a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
@@ -119,11 +119,15 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
             "Failed to load Bloom block for key "
                 + Bytes.toStringBinary(key, keyOffset, keyLength), ex);
       }
-
-      ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();
-      result = BloomFilterUtil.contains(key, keyOffset, keyLength,
-          bloomBuf, bloomBlock.headerSize(),
-          bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
+      try {
+        ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();
+        result =
+            BloomFilterUtil.contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(),
+              bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
+      } finally {
+        // After the use return back the block if it was served from a cache.
+        reader.returnBlock(bloomBlock);
+      }
     }
 
     if (numQueriesPerChunk != null && block >= 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 71ac506..48636a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -374,6 +373,12 @@ public class HFile {
         final boolean updateCacheMetrics, BlockType expectedBlockType,
         DataBlockEncoding expectedDataBlockEncoding)
         throws IOException;
+
+    /**
+     * Return the given block back to the cache, if it was obtained from cache.
+     * @param block Block to be returned.
+     */
+    void returnBlock(HFileBlock block);
   }
 
   /** An interface used by clients to open and iterate an {@link HFile}. */
@@ -389,7 +394,7 @@ public class HFile {
 
     HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
 
-    ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
+    HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
 
     Map<byte[], byte[]> loadFileInfo() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 8672d62..7104267 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -121,7 +121,8 @@ public class HFileBlock implements Cacheable {
 
   static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-        public HFileBlock deserialize(ByteBuff buf, boolean reuse) throws IOException{
+        public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
+            throws IOException {
           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
           ByteBuff newByteBuffer;
           if (reuse) {
@@ -135,7 +136,7 @@ public class HFileBlock implements Cacheable {
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
           boolean usesChecksum = buf.get() == (byte)1;
-          HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
+          HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType);
           hFileBlock.offset = buf.getLong();
           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
           if (hFileBlock.hasNextBlockHeader()) {
@@ -152,7 +153,7 @@ public class HFileBlock implements Cacheable {
         @Override
         public HFileBlock deserialize(ByteBuff b) throws IOException {
           // Used only in tests
-          return deserialize(b, false);
+          return deserialize(b, false, MemoryType.EXCLUSIVE);
         }
       };
   private static final int deserializerIdentifier;
@@ -198,6 +199,8 @@ public class HFileBlock implements Cacheable {
    */
   private int nextBlockOnDiskSizeWithHeader = -1;
 
+  private MemoryType memType = MemoryType.EXCLUSIVE;
+
   /**
    * Creates a new {@link HFile} block from the given fields. This constructor
    * is mostly used when the block data has already been read and uncompressed,
@@ -255,15 +258,24 @@ public class HFileBlock implements Cacheable {
   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
     this(new SingleByteBuff(b), usesHBaseChecksum);
   }
+
   /**
    * Creates a block from an existing buffer starting with a header. Rewinds
    * and takes ownership of the buffer. By definition of rewind, ignores the
    * buffer position, but if you slice the buffer beforehand, it will rewind
-   * to that point. The reason this has a minorNumber and not a majorNumber is
-   * because majorNumbers indicate the format of a HFile whereas minorNumbers
-   * indicate the format inside a HFileBlock.
+   * to that point.
    */
   HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
+    this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE);
+  }
+
+  /**
+   * Creates a block from an existing buffer starting with a header. Rewinds
+   * and takes ownership of the buffer. By definition of rewind, ignores the
+   * buffer position, but if you slice the buffer beforehand, it will rewind
+   * to that point.
+   */
+  HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException {
     b.rewind();
     blockType = BlockType.read(b);
     onDiskSizeWithoutHeader = b.getInt();
@@ -282,6 +294,7 @@ public class HFileBlock implements Cacheable {
                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
     }
     this.fileContext = contextBuilder.build();
+    this.memType = memType;
     buf = b;
     buf.rewind();
   }
@@ -650,8 +663,8 @@ public class HFileBlock implements Cacheable {
   public long heapSize() {
     long size = ClassSize.align(
         ClassSize.OBJECT +
-        // Block type, multi byte buffer and meta references
-        3 * ClassSize.REFERENCE +
+        // Block type, multi byte buffer, MemoryType and meta references
+        4 * ClassSize.REFERENCE +
         // On-disk size, uncompressed size, and next block's on-disk size
         // bytePerChecksum and onDiskDataSize
         4 * Bytes.SIZEOF_INT +
@@ -1885,6 +1898,11 @@ public class HFileBlock implements Cacheable {
     return this.fileContext;
   }
 
+  @Override
+  public MemoryType getMemoryType() {
+    return this.memType;
+  }
+
   /**
    * Convert the contents of the block header into a human readable string.
    * This is mostly helpful for debugging. This assumes that the block

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 30cf7ab..3494145 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.Cell;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
@@ -295,78 +295,90 @@ public class HFileBlockIndex {
       int lookupLevel = 1; // How many levels deep we are in our lookup.
       int index = -1;
 
-      HFileBlock block;
+      HFileBlock block = null;
+      boolean dataBlock = false;
       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
       while (true) {
-
-        if (currentBlock != null && currentBlock.getOffset() == currentOffset)
-        {
-          // Avoid reading the same block again, even with caching turned off.
-          // This is crucial for compaction-type workload which might have
-          // caching turned off. This is like a one-block cache inside the
-          // scanner.
-          block = currentBlock;
-        } else {
-          // Call HFile's caching block reader API. We always cache index
-          // blocks, otherwise we might get terrible performance.
-          boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
-          BlockType expectedBlockType;
-          if (lookupLevel < searchTreeLevel - 1) {
-            expectedBlockType = BlockType.INTERMEDIATE_INDEX;
-          } else if (lookupLevel == searchTreeLevel - 1) {
-            expectedBlockType = BlockType.LEAF_INDEX;
+        try {
+          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
+            // Avoid reading the same block again, even with caching turned off.
+            // This is crucial for compaction-type workload which might have
+            // caching turned off. This is like a one-block cache inside the
+            // scanner.
+            block = currentBlock;
           } else {
-            // this also accounts for ENCODED_DATA
-            expectedBlockType = BlockType.DATA;
+            // Call HFile's caching block reader API. We always cache index
+            // blocks, otherwise we might get terrible performance.
+            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
+            BlockType expectedBlockType;
+            if (lookupLevel < searchTreeLevel - 1) {
+              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
+            } else if (lookupLevel == searchTreeLevel - 1) {
+              expectedBlockType = BlockType.LEAF_INDEX;
+            } else {
+              // this also accounts for ENCODED_DATA
+              expectedBlockType = BlockType.DATA;
+            }
+            block =
+                cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
+                  isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
           }
-          block = cachingBlockReader.readBlock(currentOffset,
-          currentOnDiskSize, shouldCache, pread, isCompaction, true,
-              expectedBlockType, expectedDataBlockEncoding);
-        }
 
-        if (block == null) {
-          throw new IOException("Failed to read block at offset " +
-              currentOffset + ", onDiskSize=" + currentOnDiskSize);
-        }
+          if (block == null) {
+            throw new IOException("Failed to read block at offset " + currentOffset
+                + ", onDiskSize=" + currentOnDiskSize);
+          }
 
-        // Found a data block, break the loop and check our level in the tree.
-        if (block.getBlockType().isData()) {
-          break;
-        }
+          // Found a data block, break the loop and check our level in the tree.
+          if (block.getBlockType().isData()) {
+            dataBlock = true;
+            break;
+          }
 
-        // Not a data block. This must be a leaf-level or intermediate-level
-        // index block. We don't allow going deeper than searchTreeLevel.
-        if (++lookupLevel > searchTreeLevel) {
-          throw new IOException("Search Tree Level overflow: lookupLevel="+
-              lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
-        }
+          // Not a data block. This must be a leaf-level or intermediate-level
+          // index block. We don't allow going deeper than searchTreeLevel.
+          if (++lookupLevel > searchTreeLevel) {
+            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
+                + ", searchTreeLevel=" + searchTreeLevel);
+          }
 
-        // Locate the entry corresponding to the given key in the non-root
-        // (leaf or intermediate-level) index block.
-        ByteBuff buffer = block.getBufferWithoutHeader();
-        index = locateNonRootIndexEntry(buffer, key, comparator);
-        if (index == -1) {
-          // This has to be changed
-          // For now change this to key value
-          KeyValue kv = KeyValueUtil.ensureKeyValue(key);
-          throw new IOException("The key "
-              + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
-              + " is before the" + " first key of the non-root index block "
-              + block);
-        }
+          // Locate the entry corresponding to the given key in the non-root
+          // (leaf or intermediate-level) index block.
+          ByteBuff buffer = block.getBufferWithoutHeader();
+          index = locateNonRootIndexEntry(buffer, key, comparator);
+          if (index == -1) {
+            // This has to be changed
+            // For now change this to key value
+            KeyValue kv = KeyValueUtil.ensureKeyValue(key);
+            throw new IOException("The key "
+                + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
+                + " is before the" + " first key of the non-root index block " + block);
+          }
 
-        currentOffset = buffer.getLong();
-        currentOnDiskSize = buffer.getInt();
+          currentOffset = buffer.getLong();
+          currentOnDiskSize = buffer.getInt();
 
-        // Only update next indexed key if there is a next indexed key in the current level
-        byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
-        if (nonRootIndexedKey != null) {
-          tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
-          nextIndexedKey = tmpNextIndexKV;
+          // Only update next indexed key if there is a next indexed key in the current level
+          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
+          if (nonRootIndexedKey != null) {
+            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
+            nextIndexedKey = tmpNextIndexKV;
+          }
+        } finally {
+          if (!dataBlock) {
+            // Return the block immediately if it is not the
+            // data block
+            cachingBlockReader.returnBlock(block);
+          }
         }
       }
 
       if (lookupLevel != searchTreeLevel) {
+        assert dataBlock == true;
+        // Though we have retrieved a data block we have found an issue
+        // in the retrieved data block. Hence returned the block so that
+        // the ref count can be decremented
+        cachingBlockReader.returnBlock(block);
         throw new IOException("Reached a data block at level " + lookupLevel +
             " but the number of levels is " + searchTreeLevel);
       }
@@ -396,16 +408,19 @@ public class HFileBlockIndex {
         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
             BlockType.LEAF_INDEX, null);
-
-        ByteBuff b = midLeafBlock.getBufferWithoutHeader();
-        int numDataBlocks = b.getIntAfterPosition(0);
-        int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
-        int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
-            keyRelOffset;
-        int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
-            + SECONDARY_INDEX_ENTRY_OVERHEAD;
-        byte[] bytes = b.toBytes(keyOffset, keyLen);
-        targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
+        try {
+          ByteBuff b = midLeafBlock.getBufferWithoutHeader();
+          int numDataBlocks = b.getIntAfterPosition(0);
+          int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
+          int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
+          int keyOffset =
+              Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
+                  + SECONDARY_INDEX_ENTRY_OVERHEAD;
+          byte[] bytes = b.toBytes(keyOffset, keyLen);
+          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
+        } finally {
+          cachingBlockReader.returnBlock(midLeafBlock);
+        }
       } else {
         // The middle of the root-level index.
         targetMidKey = blockKeys[rootCount / 2];

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4189320..4a11b14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -34,10 +34,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.SizeCachedKeyValue;
-import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
@@ -256,6 +258,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
               }
               HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
                 null, null);
+              // Need not update the current block. Ideally here the readBlock won't find the
+              // block in cache. We call this readBlock so that block data is read from FS and
+              // cached in BC. So there is no reference count increment that happens here.
+              // The return will ideally be a noop because the block is not of MemoryType SHARED.
+              returnBlock(block);
               prevBlock = block;
               offset += block.getOnDiskSizeWithHeader();
             }
@@ -337,6 +344,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     return fileSize;
   }
 
+  @Override
+  public void returnBlock(HFileBlock block) {
+    BlockCache blockCache = this.cacheConf.getBlockCache();
+    if (blockCache != null && block != null) {
+      BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(),
+          block.getOffset());
+      blockCache.returnBlock(cacheKey, block);
+    }
+  }
   /**
    * @return the first key in the file. May be null if file has no entries. Note
    *         that this is not the first row key, but rather the byte form of the
@@ -449,7 +465,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     protected final HFile.Reader reader;
     private int currTagsLen;
     private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
-    protected HFileBlock block;
     // A pair for reusing in blockSeek() so that we don't garbage lot of objects
     final Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
 
@@ -461,6 +476,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
      */
     protected Cell nextIndexedKey;
+    // Current block being used
+    protected HFileBlock curBlock;
+    // Previous blocks that were used in the course of the read
+    protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<HFileBlock>();
 
     public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
@@ -470,6 +489,41 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       this.isCompaction = isCompaction;
     }
 
+    void updateCurrBlockRef(HFileBlock block) {
+      if (block != null && this.curBlock != null &&
+          block.getOffset() == this.curBlock.getOffset()) {
+        return;
+      }
+      if (this.curBlock != null) {
+        prevBlocks.add(this.curBlock);
+      }
+      this.curBlock = block;
+    }
+
+    void reset() {
+      if (this.curBlock != null) {
+        this.prevBlocks.add(this.curBlock);
+      }
+      this.curBlock = null;
+    }
+
+    private void returnBlockToCache(HFileBlock block) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Returning the block : " + block);
+      }
+      this.reader.returnBlock(block);
+    }
+
+    private void returnBlocks(boolean returnAll) {
+      for (int i = 0; i < this.prevBlocks.size(); i++) {
+        returnBlockToCache(this.prevBlocks.get(i));
+      }
+      this.prevBlocks.clear();
+      if (returnAll && this.curBlock != null) {
+        returnBlockToCache(this.curBlock);
+        this.curBlock = null;
+      }
+    }
     @Override
     public boolean isSeeked(){
       return blockBuffer != null;
@@ -498,6 +552,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       return kvBufSize;
     }
 
+    @Override
+    public void close() {
+      this.returnBlocks(true);
+    }
+
     protected int getNextCellStartPosition() {
       int nextKvPos =  blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
           + currMemstoreTSLen;
@@ -536,7 +595,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     private final void checkTagsLen() {
       if (checkLen(this.currTagsLen)) {
         throw new IllegalStateException("Invalid currTagsLen " + this.currTagsLen +
-          ". Block offset: " + block.getOffset() + ", block length: " + this.blockBuffer.limit() +
+          ". Block offset: " + curBlock.getOffset() + ", block length: " +
+            this.blockBuffer.limit() +
           ", position: " + this.blockBuffer.position() + " (without header).");
       }
     }
@@ -610,7 +670,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             || vlen > blockBuffer.limit()) {
           throw new IllegalStateException("Invalid klen " + klen + " or vlen "
               + vlen + ". Block offset: "
-              + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+              + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
               + blockBuffer.position() + " (without header).");
         }
         offsetFromPos += Bytes.SIZEOF_LONG;
@@ -626,7 +686,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
               ^ (blockBuffer.getByteAfterPosition(offsetFromPos + 1) & 0xff);
           if (tlen < 0 || tlen > blockBuffer.limit()) {
             throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
-                + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+                + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
                 + blockBuffer.position() + " (without header).");
           }
           // add the two bytes read for the tags.
@@ -641,8 +701,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             if (lastKeyValueSize < 0) {
               throw new IllegalStateException("blockSeek with seekBefore "
                   + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key)
-                  + ", blockOffset=" + block.getOffset() + ", onDiskSize="
-                  + block.getOnDiskSizeWithHeader());
+                  + ", blockOffset=" + curBlock.getOffset() + ", onDiskSize="
+                  + curBlock.getOnDiskSizeWithHeader());
             }
             blockBuffer.moveBack(lastKeyValueSize);
             readKeyValueLen();
@@ -709,8 +769,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             // smaller than
             // the next indexed key or the current data block is the last data
             // block.
-            return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
+            return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key,
+                false);
           }
+
         }
       }
       // Don't rewind on a reseek operation, because reseek implies that we are
@@ -734,10 +796,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      */
     public int seekTo(Cell key, boolean rewind) throws IOException {
       HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
-      BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
+      BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock,
           cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
-        // This happens if the key e.g. falls before the beginning of the file.
+        // This happens if the key e.g. falls before the beginning of the
+        // file.
         return -1;
       }
       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
@@ -746,7 +809,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
     @Override
     public boolean seekBefore(Cell key) throws IOException {
-      HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
+      HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock,
           cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
       if (seekToBlock == null) {
         return false;
@@ -761,6 +824,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           return false;
         }
 
+        // The first key in the current block 'seekToBlock' is greater than the given 
+        // seekBefore key. We will go ahead by reading the next block that satisfies the
+        // given key. Return the current block before reading the next one.
+        reader.returnBlock(seekToBlock);
         // It is important that we compute and pass onDiskSize to the block
         // reader so that it does not have to read the header separately to
         // figure out the size.
@@ -783,28 +850,33 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      */
     protected HFileBlock readNextDataBlock() throws IOException {
       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block == null)
+      if (curBlock == null)
         return null;
 
-      HFileBlock curBlock = block;
+      HFileBlock block = this.curBlock;
 
       do {
-        if (curBlock.getOffset() >= lastDataBlockOffset)
+        if (block.getOffset() >= lastDataBlockOffset)
           return null;
 
-        if (curBlock.getOffset() < 0) {
+        if (block.getOffset() < 0) {
           throw new IOException("Invalid block file offset: " + block);
         }
 
         // We are reading the next block without block type validation, because
         // it might turn out to be a non-data block.
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+        block = reader.readBlock(block.getOffset()
+            + block.getOnDiskSizeWithHeader(),
+            block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
             isCompaction, true, null, getEffectiveDataBlockEncoding());
-      } while (!curBlock.getBlockType().isData());
+        if (block != null && !block.getBlockType().isData()) {
+          // Whatever block we read we will be returning it unless
+          // it is a datablock. Just in case the blocks are non data blocks
+          reader.returnBlock(block);
+        }
+      } while (!block.getBlockType().isData());
 
-      return curBlock;
+      return block;
     }
 
     public DataBlockEncoding getEffectiveDataBlockEncoding() {
@@ -817,13 +889,27 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         return null;
 
       KeyValue ret;
+      // TODO : reduce the varieties of KV here. Check if based on a boolean
+      // we can handle the 'no tags' case
+      // TODO : Handle MBB here
       if (currTagsLen > 0) {
-        ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
-            + blockBuffer.position(), getCellBufSize());
+        if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
+          ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+                          + blockBuffer.position(), getCellBufSize());
+        } else {
+          ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+                  + blockBuffer.position(), getCellBufSize());
+        }
       } else {
-        ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+        if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
+          ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+            + blockBuffer.position(), getCellBufSize());
+        } else {
+          ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
             + blockBuffer.position(), getCellBufSize());
+        }
       }
+
       if (this.reader.shouldIncludeMemstoreTS()) {
         ret.setSequenceId(currMemstoreTS);
       }
@@ -838,6 +924,32 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
               + KEY_VALUE_LEN_SIZE, currKeyLen);
     }
 
+    private static class ShareableMemoryKeyValue extends SizeCachedKeyValue implements
+        ShareableMemory {
+      public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
+        super(bytes, offset, length);
+      }
+
+      @Override
+      public Cell cloneToCell() {
+        byte[] copy = Bytes.copy(this.bytes, this.offset, this.length);
+        return new SizeCachedKeyValue(copy, 0, copy.length);
+      }
+    }
+
+    private static class ShareableMemoryNoTagsKeyValue extends SizeCachedNoTagsKeyValue implements
+        ShareableMemory {
+      public ShareableMemoryNoTagsKeyValue(byte[] bytes, int offset, int length) {
+        super(bytes, offset, length);
+      }
+
+      @Override
+      public Cell cloneToCell() {
+        byte[] copy = Bytes.copy(this.bytes, this.offset, this.length);
+        return new SizeCachedNoTagsKeyValue(copy, 0, copy.length);
+      }
+    }
+
     @Override
     public ByteBuffer getValue() {
       assertSeeked();
@@ -849,7 +961,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     }
 
     protected void setNonSeekedState() {
-      block = null;
+      reset();
       blockBuffer = null;
       currKeyLen = 0;
       currValueLen = 0;
@@ -869,7 +981,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             + "; currKeyLen = " + currKeyLen + "; currValLen = "
             + currValueLen + "; block limit = " + blockBuffer.limit()
             + "; HFile name = " + reader.getName()
-            + "; currBlock currBlockOffset = " + block.getOffset());
+            + "; currBlock currBlockOffset = " + this.curBlock.getOffset());
         throw e;
       }
     }
@@ -882,7 +994,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     private boolean positionForNextBlock() throws IOException {
       // Methods are small so they get inlined because they are 'hot'.
       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block.getOffset() >= lastDataBlockOffset) {
+      if (this.curBlock.getOffset() >= lastDataBlockOffset) {
         setNonSeekedState();
         return false;
       }
@@ -897,7 +1009,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         setNonSeekedState();
         return false;
       }
-      updateCurrBlock(nextBlock);
+      updateCurrentBlock(nextBlock);
       return true;
     }
 
@@ -946,27 +1058,37 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         return false;
       }
 
-      long firstDataBlockOffset =
-          reader.getTrailer().getFirstDataBlockOffset();
-      if (block != null && block.getOffset() == firstDataBlockOffset) {
-        blockBuffer.rewind();
-        readKeyValueLen();
-        return true;
+      long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset();
+      if (curBlock != null
+          && curBlock.getOffset() == firstDataBlockOffset) {
+        return processFirstDataBlock();
       }
 
-      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+      readAndUpdateNewBlock(firstDataBlockOffset);
+      return true;
+    }
+
+    protected boolean processFirstDataBlock() throws IOException{
+      blockBuffer.rewind();
+      readKeyValueLen();
+      return true;
+    }
+
+    protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException,
+        CorruptHFileException {
+      HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
           isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
-      if (block.getOffset() < 0) {
-        throw new IOException("Invalid block offset: " + block.getOffset());
+      if (newBlock.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + newBlock.getOffset());
       }
-      updateCurrBlock(block);
-      return true;
+      updateCurrentBlock(newBlock);
     }
 
     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
         boolean rewind, Cell key, boolean seekBefore) throws IOException {
-      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
-        updateCurrBlock(seekToBlock);
+      if (this.curBlock == null
+          || this.curBlock.getOffset() != seekToBlock.getOffset()) {
+        updateCurrentBlock(seekToBlock);
       } else if (rewind) {
         blockBuffer.rewind();
       }
@@ -989,10 +1111,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      */
     protected final void checkKeyValueLen() {
       if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
-        throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
-          " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
-          ", block length: " + this.blockBuffer.limit() + ", position: " +
-           this.blockBuffer.position() + " (without header).");
+        throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen
+            + " or currValueLen " + this.currValueLen + ". Block offset: "
+            + this.curBlock.getOffset() + ", block length: "
+            + this.blockBuffer.limit() + ", position: " + this.blockBuffer.position()
+            + " (without header).");
       }
     }
 
@@ -1002,19 +1125,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      *
      * @param newBlock the block to make current
      */
-    protected void updateCurrBlock(HFileBlock newBlock) {
-      block = newBlock;
-
+    protected void updateCurrentBlock(HFileBlock newBlock) throws IOException {
+      // Set the active block on the reader
       // sanity check
-      if (block.getBlockType() != BlockType.DATA) {
-        throw new IllegalStateException("Scanner works only on data " +
-            "blocks, got " + block.getBlockType() + "; " +
-            "fileName=" + reader.getName() + ", " +
-            "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " +
-            "isCompaction=" + isCompaction);
+      if (newBlock.getBlockType() != BlockType.DATA) {
+        throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got "
+            + newBlock.getBlockType() + "; " + "fileName=" + reader.getName()
+            + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction="
+            + isCompaction);
       }
 
-      blockBuffer = block.getBufferWithoutHeader();
+      updateCurrBlockRef(newBlock);
+      blockBuffer = newBlock.getBufferWithoutHeader();
       readKeyValueLen();
       blockFetches++;
 
@@ -1058,13 +1180,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     }
 
     @Override
-    public void close() {
-      // HBASE-12295 will add code here.
-    }
-
-    @Override
     public void shipped() throws IOException {
-      // HBASE-12295 will add code here.
+      this.returnBlocks(false);
     }
   }
 
@@ -1127,8 +1244,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
          updateCacheMetrics);
        if (cachedBlock != null) {
          if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
-           cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
-         }
+           HFileBlock compressedBlock = cachedBlock;
+           cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
+           // In case of compressed block after unpacking we can return the compressed block
+          if (compressedBlock != cachedBlock) {
+            cache.returnBlock(cacheKey, compressedBlock);
+          }
+        }
          validateBlockType(cachedBlock, expectedBlockType);
 
          if (expectedDataBlockEncoding == null) {
@@ -1163,6 +1285,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
                      " because of a data block encoding mismatch" +
                      "; expected: " + expectedDataBlockEncoding +
                      ", actual: " + actualDataBlockEncoding);
+             // This is an error scenario. so here we need to decrement the
+             // count.
+             cache.returnBlock(cacheKey, cachedBlock);
              cache.evictBlock(cacheKey);
            }
            return null;
@@ -1180,7 +1305,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    * @throws IOException
    */
   @Override
-  public ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock)
+  public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock)
       throws IOException {
     if (trailer.getMetaIndexCount() == 0) {
       return null; // there are no meta blocks
@@ -1213,7 +1338,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           assert cachedBlock.isUnpacked() : "Packed block leak.";
           // Return a distinct 'shallow copy' of the block,
           // so pos does not get messed by the scanner
-          return cachedBlock.getBufferWithoutHeader();
+          return cachedBlock;
         }
         // Cache Miss, please load.
       }
@@ -1227,7 +1352,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
       }
 
-      return metaBlock.getBufferWithoutHeader();
+      return metaBlock;
     }
   }
 
@@ -1424,7 +1549,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
     @Override
     public boolean isSeeked(){
-      return this.block != null;
+      return curBlock != null;
+    }
+
+    public void setNonSeekedState() {
+      reset();
     }
 
     /**
@@ -1434,21 +1563,21 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      * @param newBlock the block to make current
      * @throws CorruptHFileException
      */
-    private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
-      block = newBlock;
+    @Override
+    protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
 
       // sanity checks
-      if (block.getBlockType() != BlockType.ENCODED_DATA) {
-        throw new IllegalStateException(
-            "EncodedScanner works only on encoded data blocks");
+      if (newBlock.getBlockType() != BlockType.ENCODED_DATA) {
+        throw new IllegalStateException("EncodedScanner works only on encoded data blocks");
       }
-      short dataBlockEncoderId = block.getDataBlockEncodingId();
+      short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
       if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
         String encoderCls = dataBlockEncoder.getClass().getName();
         throw new CorruptHFileException("Encoder " + encoderCls
           + " doesn't support data block encoding "
           + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
       }
+      updateCurrBlockRef(newBlock);
       ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
       seeker.setCurrentBuffer(encodedBuffer);
       blockFetches++;
@@ -1467,29 +1596,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     }
 
     @Override
-    public boolean seekTo() throws IOException {
-      if (reader == null) {
-        return false;
-      }
-
-      if (reader.getTrailer().getEntryCount() == 0) {
-        // No data blocks.
-        return false;
-      }
-
-      long firstDataBlockOffset =
-          reader.getTrailer().getFirstDataBlockOffset();
-      if (block != null && block.getOffset() == firstDataBlockOffset) {
-        seeker.rewind();
-        return true;
-      }
-
-      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
-          isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
-      if (block.getOffset() < 0) {
-        throw new IOException("Invalid block offset: " + block.getOffset());
-      }
-      updateCurrentBlock(block);
+    protected boolean processFirstDataBlock() throws IOException {
+      seeker.rewind();
       return true;
     }
 
@@ -1497,10 +1605,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     public boolean next() throws IOException {
       boolean isValid = seeker.next();
       if (!isValid) {
-        block = readNextDataBlock();
-        isValid = block != null;
+        HFileBlock newBlock = readNextDataBlock();
+        isValid = newBlock != null;
         if (isValid) {
-          updateCurrentBlock(block);
+          updateCurrentBlock(newBlock);
+        } else {
+          setNonSeekedState();
         }
       }
       return isValid;
@@ -1520,7 +1630,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
 
     @Override
     public Cell getCell() {
-      if (block == null) {
+      if (this.curBlock == null) {
         return null;
       }
       return seeker.getCell();
@@ -1539,7 +1649,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     }
 
     private void assertValidSeek() {
-      if (block == null) {
+      if (this.curBlock == null) {
         throw new NotSeekedException();
       }
     }
@@ -1548,9 +1658,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       return dataBlockEncoder.getFirstKeyCellInBlock(getEncodedBuffer(curBlock));
     }
 
+    @Override
     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
         boolean rewind, Cell key, boolean seekBefore) throws IOException {
-      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+      if (this.curBlock == null
+          || this.curBlock.getOffset() != seekToBlock.getOffset()) {
         updateCurrentBlock(seekToBlock);
       } else if (rewind) {
         seeker.rewind();
@@ -1631,6 +1743,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     HFileContextBuilder builder = new HFileContextBuilder()
       .withIncludesMvcc(shouldIncludeMemstoreTS())
       .withHBaseCheckSum(true)
+      .withHFileName(this.getName())
       .withCompression(this.compressAlgo);
 
     // Check for any key material available

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 806ddc9..dfbdc05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -34,11 +34,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
@@ -49,6 +48,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -1090,4 +1090,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
   public BlockCache[] getBlockCaches() {
     return null;
   }
+
+  @Override
+  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
+    // There is no SHARED type here. Just return
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
index f12f3b4..1eb7bfd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.Addressing;
@@ -259,7 +260,8 @@ public class MemcachedBlockCache implements BlockCache {
     public HFileBlock decode(CachedData d) {
       try {
         ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
-        return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
+        return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true,
+          MemoryType.EXCLUSIVE);
       } catch (IOException e) {
         LOG.warn("Error deserializing data from memcached",e);
       }
@@ -272,4 +274,9 @@ public class MemcachedBlockCache implements BlockCache {
     }
   }
 
+  @Override
+  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
+    // Not doing reference counting. All blocks here are EXCLUSIVE
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index f05a255..1c331a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -58,16 +59,17 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ConcurrentIndex;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -421,20 +423,19 @@ public class BucketCache implements BlockCache, HeapSize {
           // TODO : change this area - should be removed after server cells and
           // 12295 are available
           int len = bucketEntry.getLength();
-          ByteBuffer buf = ByteBuffer.allocate(len);
-          int lenRead = ioEngine.read(buf, bucketEntry.offset());
-          ByteBuff bb = new SingleByteBuff(buf);
-          if (lenRead != len) {
-            throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
-          }
+          Pair<ByteBuff, MemoryType> pair = ioEngine.read(bucketEntry.offset(), len);
+          ByteBuff bb = pair.getFirst();
           CacheableDeserializer<Cacheable> deserializer =
             bucketEntry.deserializerReference(this.deserialiserMap);
-          Cacheable cachedBlock = deserializer.deserialize(bb, true);
+          Cacheable cachedBlock = deserializer.deserialize(bb, true, pair.getSecond());
           long timeTaken = System.nanoTime() - start;
           if (updateCacheMetrics) {
             cacheStats.hit(caching);
             cacheStats.ioHit(timeTaken);
           }
+          if (pair.getSecond() == MemoryType.SHARED) {
+            bucketEntry.refCount.incrementAndGet();
+          }
           bucketEntry.access(accessCount.incrementAndGet());
           if (this.ioErrorStartTime > 0) {
             ioErrorStartTime = -1;
@@ -468,14 +469,59 @@ public class BucketCache implements BlockCache, HeapSize {
 
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
+    return evictBlock(cacheKey, true);
+  }
+
+  // does not check for the ref count. Just tries to evict it if found in the
+  // bucket map
+  private boolean forceEvict(BlockCacheKey cacheKey) {
     if (!cacheEnabled) {
       return false;
     }
+    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
+    BucketEntry bucketEntry = backingMap.get(cacheKey);
+    if (bucketEntry == null) {
+      if (removedBlock != null) {
+        cacheStats.evicted(0);
+        return true;
+      } else {
+        return false;
+      }
+    }
+    IdLock.Entry lockEntry = null;
+    try {
+      lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+      if (backingMap.remove(cacheKey, bucketEntry)) {
+        blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+      } else {
+        return false;
+      }
+    } catch (IOException ie) {
+      LOG.warn("Failed evicting block " + cacheKey);
+      return false;
+    } finally {
+      if (lockEntry != null) {
+        offsetLock.releaseLockEntry(lockEntry);
+      }
+    }
+    cacheStats.evicted(bucketEntry.getCachedTime());
+    return true;
+  }
+
+  private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
     if (removedBlock != null) {
       this.blockNumber.decrementAndGet();
       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
     }
+    return removedBlock;
+  }
+
+  public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
+    if (!cacheEnabled) {
+      return false;
+    }
+    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
     BucketEntry bucketEntry = backingMap.get(cacheKey);
     if (bucketEntry == null) {
       if (removedBlock != null) {
@@ -488,10 +534,28 @@ public class BucketCache implements BlockCache, HeapSize {
     IdLock.Entry lockEntry = null;
     try {
       lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
-      if (backingMap.remove(cacheKey, bucketEntry)) {
-        blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+      int refCount = bucketEntry.refCount.get();
+      if(refCount == 0) {
+        if (backingMap.remove(cacheKey, bucketEntry)) {
+          blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+        } else {
+          return false;
+        }
       } else {
-        return false;
+        if(!deletedBlock) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+                + " readers. Can not be freed now");
+          }
+          return false;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+                + " readers. Can not be freed now. Hence will mark this"
+                + " for evicting at a later point");
+          }
+          bucketEntry.markedForEvict = true;
+        }
       }
     } catch (IOException ie) {
       LOG.warn("Failed evicting block " + cacheKey);
@@ -1107,6 +1171,10 @@ public class BucketCache implements BlockCache, HeapSize {
     byte deserialiserIndex;
     private volatile long accessCounter;
     private BlockPriority priority;
+    // Set this when we were not able to forcefully evict the block
+    private volatile boolean markedForEvict;
+    private AtomicInteger refCount = new AtomicInteger(0);
+
     /**
      * Time this block was cached.  Presumes we are created just before we are added to the cache.
      */
@@ -1198,9 +1266,12 @@ public class BucketCache implements BlockCache, HeapSize {
     public long free(long toFree) {
       Map.Entry<BlockCacheKey, BucketEntry> entry;
       long freedBytes = 0;
+      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
+      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
       while ((entry = queue.pollLast()) != null) {
-        evictBlock(entry.getKey());
-        freedBytes += entry.getValue().getLength();
+        if (evictBlock(entry.getKey(), false)) {
+          freedBytes += entry.getValue().getLength();
+        }
         if (freedBytes >= toFree) {
           return freedBytes;
         }
@@ -1404,4 +1475,26 @@ public class BucketCache implements BlockCache, HeapSize {
   public BlockCache[] getBlockCaches() {
     return null;
   }
+
+  @Override
+  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
+    if (block.getMemoryType() == MemoryType.SHARED) {
+      BucketEntry bucketEntry = backingMap.get(cacheKey);
+      if (bucketEntry != null) {
+        int refCount = bucketEntry.refCount.decrementAndGet();
+        if (bucketEntry.markedForEvict && refCount == 0) {
+          forceEvict(cacheKey);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public int getRefCount(BlockCacheKey cacheKey) {
+    BucketEntry bucketEntry = backingMap.get(cacheKey);
+    if (bucketEntry != null) {
+      return bucketEntry.refCount.get();
+    }
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 03c65de..092234b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferArray;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * IO engine that stores data in memory using an array of ByteBuffers
@@ -64,24 +67,24 @@ public class ByteBufferIOEngine implements IOEngine {
     return false;
   }
 
-  /**
-   * Transfers data from the buffer array to the given byte buffer
-   * @param dstBuffer the given byte buffer into which bytes are to be written
-   * @param offset The offset in the ByteBufferArray of the first byte to be
-   *          read
-   * @return number of bytes read
-   * @throws IOException
-   */
-  @Override
-  public int read(ByteBuffer dstBuffer, long offset) throws IOException {
-    assert dstBuffer.hasArray();
-    return bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
-        dstBuffer.arrayOffset());
-  }
-
   @Override
-  public ByteBuff read(long offset, int len) throws IOException {
-    return bufferArray.asSubByteBuff(offset, len);
+  public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
+    // TODO : this allocate and copy will go away once we create BB backed cells
+    ByteBuffer dstBuffer = ByteBuffer.allocate(length);
+    bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
+      dstBuffer.arrayOffset());
+    // Here the buffer that is created directly refers to the buffer in the actual buckets.
+    // When any cell is referring to the blocks created out of these buckets then it means that
+    // those cells are referring to a shared memory area which if evicted by the BucketCache would
+    // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
+    // so that the readers using this block are aware of this fact and do the necessary action
+    // to prevent eviction till the results are either consumed or copied
+    if (dstBuffer.limit() != length) {
+      throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+          + " expected");
+    }
+    // TODO : to be removed - make it conditional
+    return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.SHARED);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index b1960c4..db589ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,8 +26,10 @@ import java.nio.channels.FileChannel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -81,14 +83,25 @@ public class FileIOEngine implements IOEngine {
 
   /**
    * Transfers data from file to the given byte buffer
-   * @param dstBuffer the given byte buffer into which bytes are to be written
    * @param offset The offset in the file where the first byte to be read
+   * @param length The length of buffer that should be allocated for reading
+   *               from the file channel
    * @return number of bytes read
    * @throws IOException
    */
   @Override
-  public int read(ByteBuffer dstBuffer, long offset) throws IOException {
-    return fileChannel.read(dstBuffer, offset);
+  public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
+    ByteBuffer dstBuffer = ByteBuffer.allocate(length);
+    fileChannel.read(dstBuffer, offset);
+    // The buffer created out of the fileChannel is formed by copying the data from the file
+    // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
+    // this buffer from the file the data is already copied and there is no need to ensure that
+    // the results are not corrupted before consuming them.
+    if (dstBuffer.limit() != length) {
+      throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+          + " expected");
+    }
+    return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.EXCLUSIVE);
   }
 
   /**
@@ -129,14 +142,6 @@ public class FileIOEngine implements IOEngine {
   }
 
   @Override
-  public ByteBuff read(long offset, int len) throws IOException {
-    ByteBuffer dstBuffer = ByteBuffer.allocate(len);
-    int read = read(dstBuffer, offset);
-    dstBuffer.limit(read);
-    return new SingleByteBuff(dstBuffer);
-  }
-
-  @Override
   public void write(ByteBuff srcBuffer, long offset) throws IOException {
     // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
     assert srcBuffer.hasArray();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
index 862042f..3efb41b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A class implementing IOEngine interface supports data services for
@@ -36,25 +38,14 @@ public interface IOEngine {
   boolean isPersistent();
 
   /**
-   * Transfers data from IOEngine to the given byte buffer
-   * @param dstBuffer the given byte buffer into which bytes are to be written
+   * Transfers data from IOEngine to a byte buffer
+   * @param length How many bytes to be read from the offset
    * @param offset The offset in the IO engine where the first byte to be read
-   * @return number of bytes read
+   * @return Pair of ByteBuffer where data is read and its MemoryType ({@link MemoryType})
    * @throws IOException
    * @throws RuntimeException when the length of the ByteBuff read is less than 'len'
    */
-  int read(ByteBuffer dstBuffer, long offset) throws IOException;
-
-  /**
-   * Transfers data from IOEngine at the given offset to an MultiByteBuffer
-   * @param offset the offset from which the underlying buckets should be read
-   * @param len the length upto which the buckets should be read
-   * @return the MultiByteBuffer formed from the underlying ByteBuffers forming the
-   * buckets
-   * @throws IOException
-   * @throws RuntimeException when the length of the ByteBuff read is less than 'len'
-   */
-  ByteBuff read(long offset, int len) throws IOException;
+  Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException;
 
   /**
    * Transfers data from the given byte buffer to IOEngine


Mime
View raw message