hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-11678 BucketCache ramCache fills heap after running a few hours
Date Thu, 07 Aug 2014 21:39:24 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 2c55970ed -> a41eca43e


HBASE-11678 BucketCache ramCache fills heap after running a few hours


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a41eca43
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a41eca43
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a41eca43

Branch: refs/heads/0.98
Commit: a41eca43e34dee7570e94a4f62080b4b541fbe7a
Parents: 2c55970
Author: stack <stack@apache.org>
Authored: Thu Aug 7 14:23:01 2014 -0700
Committer: stack <stack@apache.org>
Committed: Thu Aug 7 14:39:06 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/LruBlockCache.java    |  53 +++-
 .../hbase/io/hfile/bucket/BucketAllocator.java  |   5 +-
 .../hbase/io/hfile/bucket/BucketCache.java      | 278 ++++++++++++-------
 .../io/hfile/bucket/TestBucketWriterThread.java | 170 ++++++++++++
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |   2 +-
 .../hadoop/hbase/ipc/TestProtoBufRpc.java       |   2 +-
 .../hbase/procedure/TestProcedureManager.java   |   2 +
 7 files changed, 400 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 8b75751..bc3f8d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
- 
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -491,12 +491,9 @@ public class LruBlockCache implements BlockCache, HeapSize {
       if(bytesToFree <= 0) return;
 
       // Instantiate priority buckets
-      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
-          singleSize());
-      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
-          multiSize());
-      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
-          memorySize());
+      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
+      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
+      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
 
       // Scan entire map putting into appropriate buckets
       for(LruCachedBlock cachedBlock : map.values()) {
@@ -594,7 +591,6 @@ public class LruBlockCache implements BlockCache, HeapSize {
    * to configuration parameters and their relatives sizes.
    */
   private class BlockBucket implements Comparable<BlockBucket> {
-
     private LruCachedBlockQueue queue;
     private long totalSize = 0;
     private long bucketSize;
@@ -640,10 +636,14 @@ public class LruBlockCache implements BlockCache, HeapSize {
       if (that == null || !(that instanceof BlockBucket)){
         return false;
       }
-
-      return compareTo(( BlockBucket)that) == 0;
+      return compareTo((BlockBucket)that) == 0;
     }
 
+    @Override
+    public int hashCode() {
+      // Nothing distingushing about each instance unless I pass in a 'name' or something
+      return super.hashCode();
+    }
   }
 
   /**
@@ -702,18 +702,20 @@ public class LruBlockCache implements BlockCache, HeapSize {
       while (this.go) {
         synchronized(this) {
           try {
-            this.wait();
+            this.wait(1000 * 10/*Don't wait for ever*/);
           } catch(InterruptedException e) {}
         }
         LruBlockCache cache = this.cache.get();
-        if(cache == null) break;
+        if (cache == null) break;
         cache.evict();
       }
     }
 
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
+        justification="This is what we want")
     public void evict() {
       synchronized(this) {
-        this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
+        this.notifyAll();
       }
     }
 
@@ -850,7 +852,30 @@ public class LruBlockCache implements BlockCache, HeapSize {
 
           @Override
           public int compareTo(CachedBlock other) {
-            return (int)(other.getOffset() - this.getOffset());
+            int diff = this.getFilename().compareTo(other.getFilename());
+            if (diff != 0) return diff;
+            diff = (int)(this.getOffset() - other.getOffset());
+            if (diff != 0) return diff;
+            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
+              throw new IllegalStateException("" + this.getCachedTime() + ", " +
+                other.getCachedTime());
+            }
+            return (int)(other.getCachedTime() - this.getCachedTime());
+          }
+
+          @Override
+          public int hashCode() {
+            return b.hashCode();
+          }
+
+          @Override
+          public boolean equals(Object obj) {
+            if (obj instanceof CachedBlock) {
+              CachedBlock cb = (CachedBlock)obj;
+              return compareTo(cb) == 0;
+            } else {
+              return false;
+            }
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
index 255813b..1cfb408 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
@@ -414,7 +415,9 @@ public final class BucketAllocator {
     assert blockSize > 0;
     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
     if (bsi == null) {
-      throw new BucketAllocatorException("Allocation too big size=" + blockSize);
+      throw new BucketAllocatorException("Allocation too big size=" + blockSize +
+        "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
+        " to accomodate if size seems reasonable and you want it cached.");
     }
     long offset = bsi.allocateBlock();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 7a462c8..b14bd8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -12,7 +12,7 @@
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- 
+
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
@@ -69,25 +69,26 @@ import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * BucketCache uses {@link BucketAllocator} to allocate/free block, and use
  * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
- * determine whether a given element hit. It could uses memory
- * {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read the
- * block data.
- * 
- * Eviction is using similar algorithm as
+ * determine if a given element is in the cache. The bucket cache can use on-heap or
+ * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
+ * store/read the block data.
+ *
+ * <p>Eviction is via a similar algorithm as used in
  * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
- * 
- * BucketCache could be used as mainly a block cache(see
- * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and
- * fragment by GC.
- * 
- * Also could be used as a secondary cache(e.g. using Fusionio to store block)
- * to enlarge cache space by
+ *
+ * <p>BucketCache can be used as mainly a block cache (see
+ * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and
+ * heap fragmentation.
+ *
+ * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to
store
+ * blocks) to enlarge cache space via
  * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
  */
 @InterfaceAudience.Private
@@ -113,7 +114,8 @@ public class BucketCache implements BlockCache, HeapSize {
   IOEngine ioEngine;
 
   // Store the block in this map before writing it to cache
-  private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
+  @VisibleForTesting
+  Map<BlockCacheKey, RAMQueueEntry> ramCache;
   // In this map, store the block's meta data like offset, length
   private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
 
@@ -124,8 +126,17 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private volatile boolean cacheEnabled;
 
-  private ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = 
+  /**
+   * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
+   * In other words, the work adding blocks to the BucketCache is divided up amongst the
+   * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
+   * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
+   * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
+   */
+  @VisibleForTesting
+  ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
       new ArrayList<BlockingQueue<RAMQueueEntry>>();
+  @VisibleForTesting
   WriterThread writerThreads[];
 
 
@@ -164,6 +175,7 @@ public class BucketCache implements BlockCache, HeapSize {
   private final int ioErrorsTolerationDuration;
   // 1 min
   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
+
   // Start time of first IO error when reading or writing IO Engine, it will be
   // reset after a successful read/write.
   private volatile long ioErrorStartTime = -1;
@@ -172,7 +184,7 @@ public class BucketCache implements BlockCache, HeapSize {
    * A "sparse lock" implementation allowing to lock on a particular block
    * identified by offset. The purpose of this is to avoid freeing the block
    * which is being read.
-   * 
+   *
    * TODO:We could extend the IdLock to IdReadWriteLock for better.
    */
   private IdLock offsetLock = new IdLock();
@@ -207,7 +219,7 @@ public class BucketCache implements BlockCache, HeapSize {
     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
   }
-  
+
   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
       throws FileNotFoundException, IOException {
@@ -252,6 +264,7 @@ public class BucketCache implements BlockCache, HeapSize {
     for (int i = 0; i < writerThreads.length; ++i) {
       writerThreads[i] = new WriterThread(writerQueues.get(i), i);
       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
+      writerThreads[i].setDaemon(true);
       writerThreads[i].start();
     }
     // Run the statistics thread periodically to print the cache statistics log
@@ -264,6 +277,15 @@ public class BucketCache implements BlockCache, HeapSize {
       persistencePath + ", bucketAllocator=" + this.bucketAllocator);
   }
 
+  @VisibleForTesting
+  boolean isCacheEnabled() {
+    return this.cacheEnabled;
+  }
+
+  public long getMaxSize() {
+    return this.cacheCapacity;
+  }
+
   public String getIoEngine() {
     return ioEngine.toString();
   }
@@ -449,7 +471,7 @@ public class BucketCache implements BlockCache, HeapSize {
     cacheStats.evicted();
     return true;
   }
-  
+
   /*
    * Statistics thread.  Periodically prints the cache statistics to the log.
    */
@@ -466,7 +488,7 @@ public class BucketCache implements BlockCache, HeapSize {
       bucketCache.logStats();
     }
   }
-  
+
   public void logStats() {
     if (!LOG.isDebugEnabled()) return;
     // Log size
@@ -484,11 +506,11 @@ public class BucketCache implements BlockCache, HeapSize {
         "hits=" + cacheStats.getHitCount() + ", " +
         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
-        "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 
+        "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
-        "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 
+        "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
         "evictions=" + cacheStats.getEvictionCount() + ", " +
         "evicted=" + cacheStats.getEvictedCount() + ", " +
@@ -523,45 +545,46 @@ public class BucketCache implements BlockCache, HeapSize {
    * Free the space if the used size reaches acceptableSize() or one size block
    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
    * ensure there must be some blocks evicted
+   * @param why Why we are being called
    */
-  private void freeSpace() {
+  private void freeSpace(final String why) {
     // Ensure only one freeSpace progress at a time
     if (!freeSpaceLock.tryLock()) return;
     try {
       freeInProgress = true;
       long bytesToFreeWithoutExtra = 0;
-      /*
-       * Calculate free byte for each bucketSizeinfo
-       */
-      StringBuffer msgBuffer = new StringBuffer();
+      // Calculate free byte for each bucketSizeinfo
+      StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
       long[] bytesToFreeForBucket = new long[stats.length];
       for (int i = 0; i < stats.length; i++) {
         bytesToFreeForBucket[i] = 0;
-        long freeGoal = (long) Math.floor(stats[i].totalCount()
-            * (1 - DEFAULT_MIN_FACTOR));
+        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
         freeGoal = Math.max(freeGoal, 1);
         if (stats[i].freeCount() < freeGoal) {
-          bytesToFreeForBucket[i] = stats[i].itemSize()
-          * (freeGoal - stats[i].freeCount());
+          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
-          msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+          if (msgBuffer != null) {
+            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
+          }
         }
       }
-      msgBuffer.append("Free for total="
-          + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
+      if (msgBuffer != null) {
+        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra)
+ ", ");
+      }
 
       if (bytesToFreeWithoutExtra <= 0) {
         return;
       }
       long currentSize = bucketAllocator.getUsedSize();
       long totalSize=bucketAllocator.getTotalSize();
-      LOG.debug("Bucket cache free space started; Attempting to  " + msgBuffer.toString()
-          + " of current used=" + StringUtils.byteDesc(currentSize)
-          + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
-          + ",total=" + StringUtils.byteDesc(totalSize));
-      
+      if (LOG.isDebugEnabled() && msgBuffer != null) {
+        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
+          " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
+
+          StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
+      }
+
       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
 
@@ -619,8 +642,7 @@ public class BucketCache implements BlockCache, HeapSize {
       stats = bucketAllocator.getIndexStatistics();
       boolean needFreeForExtra = false;
       for (int i = 0; i < stats.length; i++) {
-        long freeGoal = (long) Math.floor(stats[i].totalCount()
-            * (1 - DEFAULT_MIN_FACTOR));
+        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
         freeGoal = Math.max(freeGoal, 1);
         if (stats[i].freeCount() < freeGoal) {
           needFreeForExtra = true;
@@ -636,8 +658,7 @@ public class BucketCache implements BlockCache, HeapSize {
         bucketQueue.add(bucketMulti);
 
         while ((bucketGroup = bucketQueue.poll()) != null) {
-          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
-              / remainingBuckets;
+          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
           bytesFreed += bucketGroup.free(bucketBytesToFree);
           remainingBuckets--;
         }
@@ -647,12 +668,14 @@ public class BucketCache implements BlockCache, HeapSize {
         long single = bucketSingle.totalSize();
         long multi = bucketMulti.totalSize();
         long memory = bucketMemory.totalSize();
-        LOG.debug("Bucket cache free space completed; " + "freed="
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Bucket cache free space completed; " + "freed="
             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
             + StringUtils.byteDesc(totalSize) + ", " + "single="
             + StringUtils.byteDesc(single) + ", " + "multi="
             + StringUtils.byteDesc(multi) + ", " + "memory="
             + StringUtils.byteDesc(memory));
+        }
       }
 
     } catch (Throwable t) {
@@ -665,19 +688,20 @@ public class BucketCache implements BlockCache, HeapSize {
   }
 
   // This handles flushing the RAM cache to IOEngine.
-  private class WriterThread extends HasThread {
-    BlockingQueue<RAMQueueEntry> inputQueue;
-    final int threadNO;
-    boolean writerEnabled = true;
+  @VisibleForTesting
+  class WriterThread extends HasThread {
+    private final BlockingQueue<RAMQueueEntry> inputQueue;
+    private final int threadNO;
+    private volatile boolean writerEnabled = true;
 
     WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
       super();
       this.inputQueue = queue;
       this.threadNO = threadNO;
-      setDaemon(true);
     }
-    
+
     // Used for test
+    @VisibleForTesting
     void disableWriter() {
       this.writerEnabled = false;
     }
@@ -689,8 +713,7 @@ public class BucketCache implements BlockCache, HeapSize {
           try {
             try {
               // Blocks
-              entries.add(inputQueue.take());
-              inputQueue.drainTo(entries);
+              entries = getRAMQueueEntries(inputQueue, entries);
               synchronized (cacheWaitSignals[threadNO]) {
                 cacheWaitSignals[threadNO].notifyAll();
               }
@@ -709,80 +732,120 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     /**
-     * Flush the entries in ramCache to IOEngine and add bucket entry to
-     * backingMap
-     * @param entries
+     * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
+     * Process all that are passed in even if failure being sure to remove from ramCache
else we'll
+     * never undo the references and we'll OOME.
+     * @param entries Presumes list passed in here will be processed by this invocation only.
No
+     * interference expected.
      * @throws InterruptedException
      */
-    private void doDrain(List<RAMQueueEntry> entries)
-        throws InterruptedException {
-      BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
-      RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
-      int done = 0;
-      while (entries.size() > 0 && cacheEnabled) {
-        // Keep going in case we throw...
-        RAMQueueEntry ramEntry = null;
+    @VisibleForTesting
+    void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
+      if (entries.isEmpty()) return;
+      // This method is a little hard to follow. We run through the passed in entries and
for each
+      // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later
we must
+      // do cleanup making sure we've cleared ramCache of all entries regardless of whether
we
+      // successfully added the item to the bucketcache; if we don't do the cleanup, we'll
OOME by
+      // filling ramCache.  We do the clean up by again running through the passed in entries
+      // doing extra work when we find a non-null bucketEntries corresponding entry.
+      final int size = entries.size();
+      BucketEntry[] bucketEntries = new BucketEntry[size];
+      // Index updated inside loop if success or if we can't succeed. We retry if cache is
full
+      // when we go to add an entry by going around the loop again without upping the index.
+      int index = 0;
+      while (cacheEnabled && index < size) {
+        RAMQueueEntry re = null;
         try {
-          ramEntry = entries.remove(entries.size() - 1);
-          if (ramEntry == null) {
-            LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
+          re = entries.get(index);
+          if (re == null) {
+            LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
+            index++;
             continue;
           }
-          BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
-              bucketAllocator, deserialiserMap, realCacheSize);
-          ramEntries[done] = ramEntry;
-          bucketEntries[done++] = bucketEntry;
+          BucketEntry bucketEntry =
+            re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
+          // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
+          bucketEntries[index] = bucketEntry;
           if (ioErrorStartTime > 0) {
             ioErrorStartTime = -1;
           }
+          index++;
         } catch (BucketAllocatorException fle) {
-          LOG.warn("Failed allocating for block "
-              + (ramEntry == null ? "" : ramEntry.getKey()), fle);
+          LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
+          // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache
below.
+          bucketEntries[index] = null;
+          index++;
         } catch (CacheFullException cfe) {
+          // Cache full when we tried to add. Try freeing space and then retrying (don't
up index)
           if (!freeInProgress) {
-            freeSpace();
+            freeSpace("Full!");
           } else {
             Thread.sleep(50);
           }
         } catch (IOException ioex) {
+          // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
           LOG.error("Failed writing to bucket cache", ioex);
           checkIOErrorIsTolerated();
         }
       }
 
-      // Make sure that the data pages we have written are on the media before
-      // we update the map.
+      // Make sure data pages are written are on media before we update maps.
       try {
         ioEngine.sync();
       } catch (IOException ioex) {
-        LOG.error("Faild syncing IO engine", ioex);
+        LOG.error("Failed syncing IO engine", ioex);
         checkIOErrorIsTolerated();
         // Since we failed sync, free the blocks in bucket allocator
-        for (int i = 0; i < done; ++i) {
+        for (int i = 0; i < entries.size(); ++i) {
           if (bucketEntries[i] != null) {
             bucketAllocator.freeBlock(bucketEntries[i].offset());
+            bucketEntries[i] = null;
           }
         }
-        done = 0;
       }
 
-      for (int i = 0; i < done; ++i) {
+      // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache
if
+      // success or error.
+      for (int i = 0; i < size; ++i) {
+        BlockCacheKey key = entries.get(i).getKey();
+        // Only add if non-null entry.
         if (bucketEntries[i] != null) {
-          backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
+          backingMap.put(key, bucketEntries[i]);
         }
-        RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
+        // Always remove from ramCache even if we failed adding it to the block cache above.
+        RAMQueueEntry ramCacheEntry = ramCache.remove(key);
         if (ramCacheEntry != null) {
-          heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
+          heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
         }
       }
 
-      if (bucketAllocator.getUsedSize() > acceptableSize()) {
-        freeSpace();
+      long used = bucketAllocator.getUsedSize();
+      if (used > acceptableSize()) {
+        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
       }
+      return;
     }
   }
 
-  
+  /**
+   * Blocks until elements available in <code>q</code> then tries to grab as
many as possible
+   * before returning.
+   * @param recepticle Where to stash the elements taken from queue. We clear before we use
it
+   * just in case.
+   * @param q The queue to take from.
+   * @return <code>receptical laden with elements taken from the queue or empty if
none found.
+   */
+  @VisibleForTesting
+  static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry>
q,
+      final List<RAMQueueEntry> receptical)
+  throws InterruptedException {
+    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume
it
+    // ok even if list grew to accommodate thousands.
+    receptical.clear();
+    receptical.add(q.take());
+    q.drainTo(receptical);
+    return receptical;
+  }
 
   private void persistToFile() throws IOException {
     assert !cacheEnabled;
@@ -860,11 +923,9 @@ public class BucketCache implements BlockCache, HeapSize {
   private void checkIOErrorIsTolerated() {
     long now = EnvironmentEdgeManager.currentTimeMillis();
     if (this.ioErrorStartTime > 0) {
-      if (cacheEnabled
-          && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
-        LOG.error("IO errors duration time has exceeded "
-            + ioErrorsTolerationDuration
-            + "ms, disabing cache, please check your IOEngine");
+      if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration)
{
+        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
+          "ms, disabing cache, please check your IOEngine");
         disableCache();
       }
     } else {
@@ -968,7 +1029,7 @@ public class BucketCache implements BlockCache, HeapSize {
           ++numEvicted;
       }
     }
-    
+
     return numEvicted;
   }
 
@@ -1043,7 +1104,7 @@ public class BucketCache implements BlockCache, HeapSize {
         this.priority = BlockPriority.MULTI;
       }
     }
-    
+
     public BlockPriority getPriority() {
       return this.priority;
     }
@@ -1125,7 +1186,8 @@ public class BucketCache implements BlockCache, HeapSize {
   /**
    * Block Entry stored in the memory with key,data and so on
    */
-  private static class RAMQueueEntry {
+  @VisibleForTesting
+  static class RAMQueueEntry {
     private BlockCacheKey key;
     private Cacheable data;
     private long accessTime;
@@ -1160,8 +1222,7 @@ public class BucketCache implements BlockCache, HeapSize {
       // This cacheable thing can't be serialized...
       if (len == 0) return null;
       long offset = bucketAllocator.allocateBlock(len);
-      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
-          inMemory);
+      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
       try {
         if (data instanceof HFileBlock) {
@@ -1182,7 +1243,7 @@ public class BucketCache implements BlockCache, HeapSize {
         bucketAllocator.freeBlock(offset);
         throw ioe;
       }
-      
+
       realCacheSize.addAndGet(len);
       return bucketEntry;
     }
@@ -1255,7 +1316,34 @@ public class BucketCache implements BlockCache, HeapSize {
 
           @Override
           public int compareTo(CachedBlock other) {
+<<<<<<< HEAD
             return (int)(this.getOffset() - other.getOffset());
+=======
+            int diff = this.getFilename().compareTo(other.getFilename());
+            if (diff != 0) return diff;
+            diff = (int)(this.getOffset() - other.getOffset());
+            if (diff != 0) return diff;
+            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
+              throw new IllegalStateException("" + this.getCachedTime() + ", " +
+                other.getCachedTime());
+            }
+            return (int)(other.getCachedTime() - this.getCachedTime());
+          }
+
+          @Override
+          public int hashCode() {
+            return e.getKey().hashCode();
+          }
+
+          @Override
+          public boolean equals(Object obj) {
+            if (obj instanceof CachedBlock) {
+              CachedBlock cb = (CachedBlock)obj;
+              return compareTo(cb) == 0;
+            } else {
+              return false;
+            }
+>>>>>>> 4997908... HBASE-11678 BucketCache ramCache fills heap after
running a few hours
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
new file mode 100644
index 0000000..d883661
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category(SmallTests.class)
+public class TestBucketWriterThread {
+  private BucketCache bc;
+  private BucketCache.WriterThread wt;
+  private BlockingQueue<RAMQueueEntry> q;
+  private Cacheable plainCacheable;
+  private BlockCacheKey plainKey;
+
+  /**
+   * Set up variables and get BucketCache and WriterThread into state where tests can  manually
+   * control the running of WriterThread and BucketCache is empty.
+   * @throws Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Arbitrary capacity.
+    final int capacity = 16;
+    // Run with one writer thread only. Means there will be one writer queue only too.  We
depend
+    // on this in below.
+    final int writerThreadsCount = 1;
+    this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
+      capacity, null, 100/*Tolerate ioerrors for 100ms*/);
+    assertEquals(writerThreadsCount, bc.writerThreads.length);
+    assertEquals(writerThreadsCount, bc.writerQueues.size());
+    // Get reference to our single WriterThread instance.
+    this.wt = bc.writerThreads[0];
+    this.q = bc.writerQueues.get(0);
+    // On construction bucketcache WriterThread is blocked on the writer queue so it will
not
+    // notice the disabling of the writer until after it has processed an entry.  Lets pass
one
+    // through after setting disable flag on the writer. We want to disable the WriterThread
so
+    // we can run the doDrain manually so we can watch it working and assert it doing right
thing.
+    wt.disableWriter();
+    this.plainKey = new BlockCacheKey("f", 0);
+    this.plainCacheable = Mockito.mock(Cacheable.class);
+    bc.cacheBlock(this.plainKey, plainCacheable);
+    while(!bc.ramCache.isEmpty()) Threads.sleep(1);
+    assertTrue(q.isEmpty());
+    // Now writer thread should be disabled.
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (this.bc != null) this.bc.shutdown();
+  }
+
+  /**
+   * Test non-error case just works.
+   * @throws FileNotFoundException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=30000)
+  public void testNonErrorCase()
+  throws FileNotFoundException, IOException, InterruptedException {
+    bc.cacheBlock(this.plainKey, this.plainCacheable);
+    doDrainOfOneEntry(this.bc, this.wt, this.q);
+  }
+
+  /**
+   * Pass through a too big entry and ensure it is cleared from queues and ramCache.
+   * Manually run the WriterThread.
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testTooBigEntry() throws InterruptedException {
+    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
+    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
+    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
+    doDrainOfOneEntry(this.bc, this.wt, this.q);
+  }
+
+  /**
+   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception,
then
+   * put it back and process it.
+   * @throws IOException 
+   * @throws BucketAllocatorException 
+   * @throws CacheFullException 
+   * @throws InterruptedException 
+   */
+  @SuppressWarnings("unchecked")
+  @Test (timeout=30000)
+  public void testIOE()
+  throws CacheFullException, BucketAllocatorException, IOException, InterruptedException
{
+    this.bc.cacheBlock(this.plainKey, plainCacheable);
+    RAMQueueEntry rqe = q.remove();
+    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
+    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
+      writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
+        (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
+    this.q.add(spiedRqe);
+    doDrainOfOneEntry(bc, wt, q);
+    // Cache disabled when ioes w/o ever healing.
+    assertTrue(!bc.isCacheEnabled());
+  }
+
+  /**
+   * Do Cache full exception
+   * @throws IOException 
+   * @throws BucketAllocatorException 
+   * @throws CacheFullException 
+   * @throws InterruptedException 
+   */
+  @Test (timeout=30000)
+  public void testCacheFullException()
+  throws CacheFullException, BucketAllocatorException, IOException, InterruptedException
{
+    this.bc.cacheBlock(this.plainKey, plainCacheable);
+    RAMQueueEntry rqe = q.remove();
+    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
+    final CacheFullException cfe = new CacheFullException(0, 0);
+    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
+    Mockito.doThrow(cfe).
+      doReturn(mockedBucketEntry).
+      when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
+        (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
+    this.q.add(spiedRqe);
+    doDrainOfOneEntry(bc, wt, q);
+  }
+
+  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread
wt,
+      final BlockingQueue<RAMQueueEntry> q)
+  throws InterruptedException {
+    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<RAMQueueEntry>(1));
+    wt.doDrain(rqes);
+    assertTrue(q.isEmpty());
+    assertTrue(bc.ramCache.isEmpty());
+    assertEquals(0, bc.heapSize());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 128a91a..57f71bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -158,7 +158,7 @@ public class TestIPC {
     TestRpcServer(RpcScheduler scheduler) throws IOException {
       super(null, "testRpcServer",
           Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
-        new InetSocketAddress("0.0.0.0", 0), CONF, scheduler);
+        new InetSocketAddress("localhost", 0), CONF, scheduler);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index 335aaa6..fe4321f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -53,7 +53,7 @@ import com.google.protobuf.ServiceException;
  */
 @Category(MediumTests.class)
 public class TestProtoBufRpc {
-  public final static String ADDRESS = "0.0.0.0";
+  public final static String ADDRESS = "localhost";
   public static int PORT = 0;
   private InetSocketAddress isa;
   private Configuration conf;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a41eca43/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
index 8d9ec24..12a1011 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.procedure;
 
+import static org.junit.Assert.assertArrayEquals;
+
 import java.io.IOException;
 import java.util.HashMap;
 


Mime
View raw message