hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject [33/50] [abbrv] hbase git commit: HBASE-13301 Possible memory leak in BucketCache
Date Fri, 01 May 2015 15:28:19 GMT
HBASE-13301 Possible memory leak in BucketCache


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4f151444
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4f151444
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4f151444

Branch: refs/heads/hbase-11339
Commit: 4f151444b58ae85b93f76254961358932e0ffb9b
Parents: 71536bd
Author: zhangduo <zhangduo@wandoujia.com>
Authored: Sat Apr 11 10:43:43 2015 +0800
Committer: zhangduo <zhangduo@wandoujia.com>
Committed: Tue Apr 14 17:41:46 2015 +0800

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      | 182 +++++++++++--------
 .../hbase/io/hfile/bucket/CachedEntryQueue.java |  20 +-
 .../org/apache/hadoop/hbase/util/IdLock.java    |  16 ++
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |   6 +-
 .../hbase/io/hfile/bucket/TestBucketCache.java  |  87 ++++++---
 5 files changed, 196 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 7dda0e6..6a5c884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -109,13 +110,14 @@ public class BucketCache implements BlockCache, HeapSize {
   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
 
   // Store/read block data
-  IOEngine ioEngine;
+  final IOEngine ioEngine;
 
   // Store the block in this map before writing it to cache
   @VisibleForTesting
-  Map<BlockCacheKey, RAMQueueEntry> ramCache;
+  final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
   // In this map, store the block's meta data like offset, length
-  private Map<BlockCacheKey, BucketEntry> backingMap;
+  @VisibleForTesting
+  ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
 
   /**
    * Flag if the cache is enabled or not... We shut it off if there are IO
@@ -132,14 +134,14 @@ public class BucketCache implements BlockCache, HeapSize {
    * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
    */
   @VisibleForTesting
-  ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
+  final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
       new ArrayList<BlockingQueue<RAMQueueEntry>>();
   @VisibleForTesting
-  WriterThread writerThreads[];
+  final WriterThread[] writerThreads;
 
   /** Volatile boolean to track if free space is in process or not */
   private volatile boolean freeInProgress = false;
-  private Lock freeSpaceLock = new ReentrantLock();
+  private final Lock freeSpaceLock = new ReentrantLock();
 
   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
 
@@ -152,17 +154,16 @@ public class BucketCache implements BlockCache, HeapSize {
   /** Cache access count (sequential ID) */
   private final AtomicLong accessCount = new AtomicLong(0);
 
-  private final Object[] cacheWaitSignals;
   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
   // Used in test now. If the flag is false and the cache speed is very fast,
   // bucket cache will skip some blocks when caching. If the flag is true, we
   // will wait blocks flushed to IOEngine for some time when caching
   boolean wait_when_cache = false;
 
-  private BucketCacheStats cacheStats = new BucketCacheStats();
+  private final BucketCacheStats cacheStats = new BucketCacheStats();
 
-  private String persistencePath;
-  private long cacheCapacity;
+  private final String persistencePath;
+  private final long cacheCapacity;
   /** Approximate block size */
   private final long blockSize;
 
@@ -182,7 +183,8 @@ public class BucketCache implements BlockCache, HeapSize {
    *
    * TODO:We could extend the IdLock to IdReadWriteLock for better.
    */
-  private IdLock offsetLock = new IdLock();
+  @VisibleForTesting
+  final IdLock offsetLock = new IdLock();
 
   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>()
{
@@ -216,7 +218,6 @@ public class BucketCache implements BlockCache, HeapSize {
       throws FileNotFoundException, IOException {
     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
     this.writerThreads = new WriterThread[writerThreadNum];
-    this.cacheWaitSignals = new Object[writerThreadNum];
     long blockNumCapacity = capacity / blockSize;
     if (blockNumCapacity >= Integer.MAX_VALUE) {
       // Enough for about 32TB of cache!
@@ -231,7 +232,6 @@ public class BucketCache implements BlockCache, HeapSize {
     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
     for (int i = 0; i < writerThreads.length; ++i) {
       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
-      this.cacheWaitSignals[i] = new Object();
     }
 
     assert writerQueues.size() == writerThreads.length;
@@ -252,7 +252,7 @@ public class BucketCache implements BlockCache, HeapSize {
     final String threadName = Thread.currentThread().getName();
     this.cacheEnabled = true;
     for (int i = 0; i < writerThreads.length; ++i) {
-      writerThreads[i] = new WriterThread(writerQueues.get(i), i);
+      writerThreads[i] = new WriterThread(writerQueues.get(i));
       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
       writerThreads[i].setDaemon(true);
     }
@@ -344,38 +344,39 @@ public class BucketCache implements BlockCache, HeapSize {
    * @param inMemory if block is in-memory
    * @param wait if true, blocking wait when queue is full
    */
-  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
-      boolean inMemory, boolean wait) {
-    if (!cacheEnabled)
+  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+      boolean wait) {
+    if (!cacheEnabled) {
       return;
+    }
 
-    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
+    if (backingMap.containsKey(cacheKey)) {
       return;
+    }
 
     /*
-     * Stuff the entry into the RAM cache so it can get drained to the
-     * persistent store
+     * Stuff the entry into the RAM cache so it can get drained to the persistent store
      */
-    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
-        accessCount.incrementAndGet(), inMemory);
-    ramCache.put(cacheKey, re);
+    RAMQueueEntry re =
+        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
+    if (ramCache.putIfAbsent(cacheKey, re) != null) {
+      return;
+    }
     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
-    boolean successfulAddition = bq.offer(re);
-    if (!successfulAddition && wait) {
-      synchronized (cacheWaitSignals[queueNum]) {
-        try {
-          successfulAddition = bq.offer(re);
-          if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
+    boolean successfulAddition = false;
+    if (wait) {
+      try {
+        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
       }
+    } else {
       successfulAddition = bq.offer(re);
     }
     if (!successfulAddition) {
-        ramCache.remove(cacheKey);
-        failedBlockAdditions.incrementAndGet();
+      ramCache.remove(cacheKey);
+      failedBlockAdditions.incrementAndGet();
     } else {
       this.blockNumber.incrementAndGet();
       this.heapSize.addAndGet(cachedItem.heapSize());
@@ -394,11 +395,14 @@ public class BucketCache implements BlockCache, HeapSize {
   @Override
   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
       boolean updateCacheMetrics) {
-    if (!cacheEnabled)
+    if (!cacheEnabled) {
       return null;
+    }
     RAMQueueEntry re = ramCache.get(key);
     if (re != null) {
-      if (updateCacheMetrics) cacheStats.hit(caching);
+      if (updateCacheMetrics) {
+        cacheStats.hit(caching);
+      }
       re.access(accessCount.incrementAndGet());
       return re.getData();
     }
@@ -408,6 +412,9 @@ public class BucketCache implements BlockCache, HeapSize {
       IdLock.Entry lockEntry = null;
       try {
         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+        // We can not read here even if backingMap does contain the given key because its
offset
+        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
+        // existence here.
         if (bucketEntry.equals(backingMap.get(key))) {
           int len = bucketEntry.getLength();
           ByteBuffer bb = ByteBuffer.allocate(len);
@@ -438,13 +445,27 @@ public class BucketCache implements BlockCache, HeapSize {
         }
       }
     }
-    if (!repeat && updateCacheMetrics) cacheStats.miss(caching);
+    if (!repeat && updateCacheMetrics) {
+      cacheStats.miss(caching);
+    }
     return null;
   }
 
+  @VisibleForTesting
+  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber)
{
+    bucketAllocator.freeBlock(bucketEntry.offset());
+    realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+    blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
+    if (decrementBlockNumber) {
+      this.blockNumber.decrementAndGet();
+    }
+  }
+
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
-    if (!cacheEnabled) return false;
+    if (!cacheEnabled) {
+      return false;
+    }
     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
     if (removedBlock != null) {
       this.blockNumber.decrementAndGet();
@@ -462,13 +483,8 @@ public class BucketCache implements BlockCache, HeapSize {
     IdLock.Entry lockEntry = null;
     try {
       lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
-      if (bucketEntry.equals(backingMap.remove(cacheKey))) {
-        bucketAllocator.freeBlock(bucketEntry.offset());
-        realCacheSize.addAndGet(-1 * bucketEntry.getLength());
-        blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
-        if (removedBlock == null) {
-          this.blockNumber.decrementAndGet();
-        }
+      if (backingMap.remove(cacheKey, bucketEntry)) {
+        blockEvicted(cacheKey, bucketEntry, removedBlock == null);
       } else {
         return false;
       }
@@ -705,13 +721,10 @@ public class BucketCache implements BlockCache, HeapSize {
   @VisibleForTesting
   class WriterThread extends HasThread {
     private final BlockingQueue<RAMQueueEntry> inputQueue;
-    private final int threadNO;
     private volatile boolean writerEnabled = true;
 
-    WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
-      super();
+    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
       this.inputQueue = queue;
-      this.threadNO = threadNO;
     }
 
     // Used for test
@@ -728,9 +741,6 @@ public class BucketCache implements BlockCache, HeapSize {
             try {
               // Blocks
               entries = getRAMQueueEntries(inputQueue, entries);
-              synchronized (cacheWaitSignals[threadNO]) {
-                cacheWaitSignals[threadNO].notifyAll();
-              }
             } catch (InterruptedException ie) {
               if (!cacheEnabled) break;
             }
@@ -755,7 +765,9 @@ public class BucketCache implements BlockCache, HeapSize {
      */
     @VisibleForTesting
     void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
-      if (entries.isEmpty()) return;
+      if (entries.isEmpty()) {
+        return;
+      }
       // This method is a little hard to follow. We run through the passed in entries and
for each
       // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later
we must
       // do cleanup making sure we've cleared ramCache of all entries regardless of whether
we
@@ -830,6 +842,21 @@ public class BucketCache implements BlockCache, HeapSize {
         RAMQueueEntry ramCacheEntry = ramCache.remove(key);
         if (ramCacheEntry != null) {
           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
+        } else if (bucketEntries[i] != null){
+          // Block should have already been evicted. Remove it and free space.
+          IdLock.Entry lockEntry = null;
+          try {
+            lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
+            if (backingMap.remove(key, bucketEntries[i])) {
+              blockEvicted(key, bucketEntries[i], false);
+            }
+          } catch (IOException e) {
+            LOG.warn("failed to free space for " + key, e);
+          } finally {
+            if (lockEntry != null) {
+              offsetLock.releaseLockEntry(lockEntry);
+            }
+          }
         }
       }
 
@@ -1055,23 +1082,35 @@ public class BucketCache implements BlockCache, HeapSize {
    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
    * by 256. So 5 bytes gives us 256TB or so.
    */
-  static class BucketEntry implements Serializable, Comparable<BucketEntry> {
+  static class BucketEntry implements Serializable {
     private static final long serialVersionUID = -6741504807982257534L;
+
+    // access counter comparator, descending order
+    static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>()
{
+
+      @Override
+      public int compare(BucketEntry o1, BucketEntry o2) {
+        long accessCounter1 = o1.accessCounter;
+        long accessCounter2 = o2.accessCounter;
+        return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2
? 0 : -1;
+      }
+    };
+
     private int offsetBase;
     private int length;
     private byte offset1;
     byte deserialiserIndex;
-    private volatile long accessTime;
+    private volatile long accessCounter;
     private BlockPriority priority;
     /**
      * Time this block was cached.  Presumes we are created just before we are added to the
cache.
      */
     private final long cachedTime = System.nanoTime();
 
-    BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
+    BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
       setOffset(offset);
       this.length = length;
-      this.accessTime = accessTime;
+      this.accessCounter = accessCounter;
       if (inMemory) {
         this.priority = BlockPriority.MEMORY;
       } else {
@@ -1110,10 +1149,10 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     /**
-     * Block has been accessed. Update its local access time.
+     * Block has been accessed. Update its local access counter.
      */
-    public void access(long accessTime) {
-      this.accessTime = accessTime;
+    public void access(long accessCounter) {
+      this.accessCounter = accessCounter;
       if (this.priority == BlockPriority.SINGLE) {
         this.priority = BlockPriority.MULTI;
       }
@@ -1123,17 +1162,6 @@ public class BucketCache implements BlockCache, HeapSize {
       return this.priority;
     }
 
-    @Override
-    public int compareTo(BucketEntry that) {
-      if(this.accessTime == that.accessTime) return 0;
-      return this.accessTime < that.accessTime ? 1 : -1;
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      return this == that;
-    }
-
     public long getCachedTime() {
       return cachedTime;
     }
@@ -1204,14 +1232,14 @@ public class BucketCache implements BlockCache, HeapSize {
   static class RAMQueueEntry {
     private BlockCacheKey key;
     private Cacheable data;
-    private long accessTime;
+    private long accessCounter;
     private boolean inMemory;
 
-    public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime,
+    public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
         boolean inMemory) {
       this.key = bck;
       this.data = data;
-      this.accessTime = accessTime;
+      this.accessCounter = accessCounter;
       this.inMemory = inMemory;
     }
 
@@ -1223,8 +1251,8 @@ public class BucketCache implements BlockCache, HeapSize {
       return key;
     }
 
-    public void access(long accessTime) {
-      this.accessTime = accessTime;
+    public void access(long accessCounter) {
+      this.accessCounter = accessCounter;
     }
 
     public BucketEntry writeToCache(final IOEngine ioEngine,
@@ -1236,7 +1264,7 @@ public class BucketCache implements BlockCache, HeapSize {
       // This cacheable thing can't be serialized...
       if (len == 0) return null;
       long offset = bucketAllocator.allocateBlock(len);
-      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
+      BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
       try {
         if (data instanceof HFileBlock) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
index b6954bb..0e33a56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
@@ -54,23 +54,23 @@ public class CachedEntryQueue {
    */
   public CachedEntryQueue(long maxSize, long blockSize) {
     int initialSize = (int) (maxSize / blockSize);
-    if (initialSize == 0)
+    if (initialSize == 0) {
       initialSize++;
-    queue = MinMaxPriorityQueue
-        .orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
-          public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
-              Entry<BlockCacheKey, BucketEntry> entry2) {
-            return entry1.getValue().compareTo(entry2.getValue());
-          }
+    }
+    queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>()
{
+
+      public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
+          Entry<BlockCacheKey, BucketEntry> entry2) {
+        return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
+      }
 
-        }).expectedSize(initialSize).create();
+    }).expectedSize(initialSize).create();
     cacheSize = 0;
     this.maxSize = maxSize;
   }
 
   /**
    * Attempt to add the specified entry to this queue.
-   * 
    * <p>
    * If the queue is smaller than the max size, or if the specified element is
    * ordered after the smallest element in the queue, the element will be added
@@ -83,7 +83,7 @@ public class CachedEntryQueue {
       cacheSize += entry.getValue().getLength();
     } else {
       BucketEntry head = queue.peek().getValue();
-      if (entry.getValue().compareTo(head) > 0) {
+      if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
         cacheSize += entry.getValue().getLength();
         cacheSize -= head.getLength();
         if (cacheSize > maxSize) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
index b9d0983..fedf951 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Allows multiple concurrent clients to lock on a numeric id with a minimal
  * memory overhead. The intended usage is as follows:
@@ -119,4 +121,18 @@ public class IdLock {
     assert map.size() == 0;
   }
 
+  @VisibleForTesting
+  public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
+    for (Entry entry;;) {
+      entry = map.get(id);
+      if (entry != null) {
+        synchronized (entry) {
+          if (entry.numWaiters >= numWaiters) {
+            return;
+          }
+        }
+      }
+      Thread.sleep(100);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 5ef8cf0..b0a2ba2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -247,11 +247,11 @@ public class CacheTestUtils {
     assertTrue(toBeTested.getStats().getEvictedCount() > 0);
   }
 
-  private static class ByteArrayCacheable implements Cacheable {
+  public static class ByteArrayCacheable implements Cacheable {
 
-    static final CacheableDeserializer<Cacheable> blockDeserializer = 
+    static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-      
+
       @Override
       public Cacheable deserialize(ByteBuffer b) throws IOException {
         int len = b.getInt();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index d29be01..99f5657 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -27,13 +28,14 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.IdLock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,24 +46,23 @@ import org.junit.runners.Parameterized;
 /**
  * Basic test of BucketCache.Puts and gets.
  * <p>
- * Tests will ensure that blocks' data correctness under several threads
- * concurrency
+ * Tests will ensure that blocks' data correctness under several threads concurrency
  */
 @RunWith(Parameterized.class)
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
 public class TestBucketCache {
 
   private static final Random RAND = new Random();
 
-  @Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}")
+  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
   public static Iterable<Object[]> data() {
     return Arrays.asList(new Object[][] {
-      { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
-      { 16 * 1024, new int[] {
-        2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
-        28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
-        128 * 1024 + 1024 } }
-    });
+        { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
+        {
+            16 * 1024,
+            new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
+                28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
+                128 * 1024 + 1024 } } });
   }
 
   @Parameterized.Parameter(0)
@@ -76,7 +77,7 @@ public class TestBucketCache {
   final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
   final int NUM_THREADS = 1000;
   final int NUM_QUERIES = 10000;
-  
+
   final long capacitySize = 32 * 1024 * 1024;
   final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
   final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
@@ -86,16 +87,16 @@ public class TestBucketCache {
   private class MockedBucketCache extends BucketCache {
 
     public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
-      int writerThreads, int writerQLen, String persistencePath)
-        throws FileNotFoundException, IOException {
+        int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
+        IOException {
       super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
-        persistencePath);
+          persistencePath);
       super.wait_when_cache = true;
     }
 
     @Override
-    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf,
-        boolean inMemory, boolean cacheDataInL1) {
+    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+        boolean cacheDataInL1) {
       if (super.getBlock(cacheKey, true, false, true) != null) {
         throw new RuntimeException("Cached an already cached block");
       }
@@ -113,8 +114,9 @@ public class TestBucketCache {
 
   @Before
   public void setup() throws FileNotFoundException, IOException {
-    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
-      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+    cache =
+        new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
+            constructedBlockSizes, writeThreads, writerQLen, persistencePath);
   }
 
   @After
@@ -142,7 +144,7 @@ public class TestBucketCache {
     // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks
until
     // the cache is completely filled.
     List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
-    for (int i = 0; !full; i++) {
+    while (!full) {
       Integer blockSize = null;
       try {
         blockSize = randFrom(tmp);
@@ -156,9 +158,7 @@ public class TestBucketCache {
     for (Integer blockSize : BLOCKSIZES) {
       BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
       IndexStatistics indexStatistics = bucketSizeInfo.statistics();
-      assertEquals(
-        "unexpected freeCount for " + bucketSizeInfo,
-        0, indexStatistics.freeCount());
+      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
     }
 
     for (long offset : allocations) {
@@ -182,4 +182,41 @@ public class TestBucketCache {
     cache.stopWriterThreads();
     CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
   }
-}
\ No newline at end of file
+
+  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then
writer
+  // threads will flush it to the bucket and put reference entry in backingMap.
+  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
+      Cacheable block) throws InterruptedException {
+    cache.cacheBlock(cacheKey, block);
+    while (!cache.backingMap.containsKey(cacheKey)) {
+      Thread.sleep(100);
+    }
+  }
+
+  @Test
+  public void testMemoryLeak() throws Exception {
+    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
+    cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
+        new byte[10]));
+    long lockId = cache.backingMap.get(cacheKey).offset();
+    IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId);
+    Thread evictThread = new Thread("evict-block") {
+
+      @Override
+      public void run() {
+        cache.evictBlock(cacheKey);
+      }
+
+    };
+    evictThread.start();
+    cache.offsetLock.waitForWaiters(lockId, 1);
+    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
+    cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
+        new byte[10]));
+    cache.offsetLock.releaseLockEntry(lockEntry);
+    evictThread.join();
+    assertEquals(1L, cache.getBlockCount());
+    assertTrue(cache.getCurrentSize() > 0L);
+    assertTrue("We should have a block!", cache.iterator().hasNext());
+  }
+}


Mime
View raw message