hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject hbase git commit: HBASE-16407 Handle MemstoreChunkPool size when HeapMemoryManager tunes memory.
Date Mon, 29 Aug 2016 04:53:03 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 7eaba369e -> 950d547da


HBASE-16407 Handle MemstoreChunkPool size when HeapMemoryManager tunes memory.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/950d547d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/950d547d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/950d547d

Branch: refs/heads/master
Commit: 950d547dae684155020edb879a343bee1bf18e97
Parents: 7eaba36
Author: anoopsamjohn <anoopsamjohn@gmail.com>
Authored: Mon Aug 29 10:22:27 2016 +0530
Committer: anoopsamjohn <anoopsamjohn@gmail.com>
Committed: Mon Aug 29 10:22:27 2016 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/HRegionServer.java       |  5 ++
 .../hbase/regionserver/HeapMemoryManager.java   | 24 ++++++++
 .../hbase/regionserver/MemStoreChunkPool.java   | 64 +++++++++++++++-----
 .../regionserver/TestMemStoreChunkPool.java     |  2 +-
 4 files changed, 78 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/950d547d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 257e724..bcd0c3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1463,6 +1463,11 @@ public class HRegionServer extends HasThread implements
         this, this.regionServerAccounting);
     if (this.hMemManager != null) {
       this.hMemManager.start(getChoreService());
+      MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf);
+      if (chunkPool != null) {
+        // Register it as HeapMemoryTuneObserver
+        this.hMemManager.registerTuneObserver(chunkPool);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/950d547d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index f90125e..c360a60 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -87,6 +89,8 @@ public class HeapMemoryManager {
 
   private MetricsHeapMemoryManager metricsHeapMemoryManager;
 
+  private List<HeapMemoryTuneObserver> tuneObservers = new ArrayList<HeapMemoryTuneObserver>();
+
   public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
                 Server server, RegionServerAccounting regionServerAccounting) {
     BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
@@ -206,6 +210,10 @@ public class HeapMemoryManager {
     this.heapMemTunerChore.cancel(true);
   }
 
+  public void registerTuneObserver(HeapMemoryTuneObserver observer) {
+    this.tuneObservers.add(observer);
+  }
+
   // Used by the test cases.
   boolean isTunerOn() {
     return this.tunerOn;
@@ -351,6 +359,9 @@ public class HeapMemoryManager {
           blockCache.setMaxSize(newBlockCacheSize);
           globalMemStorePercent = memstoreSize;
           memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
+          for (HeapMemoryTuneObserver observer : tuneObservers) {
+            observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize);
+          }
         }
       } else {
         metricsHeapMemoryManager.increaseTunerDoNothingCounter();
@@ -489,4 +500,17 @@ public class HeapMemoryManager {
       return needsTuning;
     }
   }
+
+  /**
+   * Every class that wants to observe heap memory tune actions must implement this interface.
+   */
+  public static interface HeapMemoryTuneObserver {
+
+    /**
+     * This method would be called by HeapMemoryManger when a heap memory tune action took
place.
+     * @param newMemstoreSize The newly calculated global memstore size
+     * @param newBlockCacheSize The newly calculated global blockcache size
+     */
+    void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/950d547d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
index 6b34d75..b650d8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
@@ -31,13 +31,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * A pool of {@link HeapMemStoreLAB.Chunk} instances.
+ * A pool of {@link Chunk} instances.
  * 
  * MemStoreChunkPool caches a number of retired chunks for reusing, it could
  * decrease allocating bytes when writing, thereby optimizing the garbage
@@ -52,7 +53,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 @SuppressWarnings("javadoc")
 @InterfaceAudience.Private
-public class MemStoreChunkPool {
+public class MemStoreChunkPool implements HeapMemoryTuneObserver {
   private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
   final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
   final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
@@ -64,30 +65,32 @@ public class MemStoreChunkPool {
   /** Boolean whether we have disabled the memstore chunk pool entirely. */
   static boolean chunkPoolDisabled = false;
 
-  private final int maxCount;
+  private int maxCount;
 
   // A queue of reclaimed chunks
   private final BlockingQueue<PooledChunk> reclaimedChunks;
   private final int chunkSize;
+  private final float poolSizePercentage;
 
   /** Statistics thread schedule pool */
   private final ScheduledExecutorService scheduleThreadPool;
   /** Statistics thread */
   private static final int statThreadPeriod = 60 * 5;
-  private final AtomicLong createdChunkCount = new AtomicLong();
+  private final AtomicLong chunkCount = new AtomicLong();
   private final AtomicLong reusedChunkCount = new AtomicLong();
 
   MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
-      int initialCount) {
+      int initialCount, float poolSizePercentage) {
     this.maxCount = maxCount;
     this.chunkSize = chunkSize;
+    this.poolSizePercentage = poolSizePercentage;
     this.reclaimedChunks = new LinkedBlockingQueue<PooledChunk>();
     for (int i = 0; i < initialCount; i++) {
       PooledChunk chunk = new PooledChunk(chunkSize);
       chunk.init();
       reclaimedChunks.add(chunk);
     }
-    createdChunkCount.set(initialCount);
+    chunkCount.set(initialCount);
     final String n = Thread.currentThread().getName();
     scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
         .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
@@ -113,10 +116,10 @@ public class MemStoreChunkPool {
     } else {
       // Make a chunk iff we have not yet created the maxCount chunks
       while (true) {
-        long created = this.createdChunkCount.get();
+        long created = this.chunkCount.get();
         if (created < this.maxCount) {
           chunk = new PooledChunk(chunkSize);
-          if (this.createdChunkCount.compareAndSet(created, created + 1)) {
+          if (this.chunkCount.compareAndSet(created, created + 1)) {
             break;
           }
         } else {
@@ -132,11 +135,12 @@ public class MemStoreChunkPool {
    * skip the remaining chunks
    * @param chunks
    */
-  void putbackChunks(BlockingQueue<PooledChunk> chunks) {
-    assert reclaimedChunks.size() < this.maxCount;
+  synchronized void putbackChunks(BlockingQueue<PooledChunk> chunks) {
+    int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
     PooledChunk chunk = null;
-    while ((chunk = chunks.poll()) != null) {
+    while ((chunk = chunks.poll()) != null && toAdd > 0) {
       reclaimedChunks.add(chunk);
+      toAdd--;
     }
   }
 
@@ -145,9 +149,10 @@ public class MemStoreChunkPool {
    * skip it
    * @param chunk
    */
-  void putbackChunk(PooledChunk chunk) {
-    assert reclaimedChunks.size() < this.maxCount;
-    reclaimedChunks.add(chunk);
+  synchronized void putbackChunk(PooledChunk chunk) {
+    if (reclaimedChunks.size() < this.maxCount) {
+      reclaimedChunks.add(chunk);
+    }
   }
 
   int getPoolSize() {
@@ -174,7 +179,7 @@ public class MemStoreChunkPool {
 
     private void logStats() {
       if (!LOG.isDebugEnabled()) return;
-      long created = createdChunkCount.get();
+      long created = chunkCount.get();
       long reused = reusedChunkCount.get();
       long total = created + reused;
       LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
@@ -222,7 +227,8 @@ public class MemStoreChunkPool {
       int initialCount = (int) (initialCountPercentage * maxCount);
       LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
           + ", max count " + maxCount + ", initial count " + initialCount);
-      GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
+      GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount,
+          poolSizePercentage);
       return GLOBAL_INSTANCE;
     }
   }
@@ -241,4 +247,30 @@ public class MemStoreChunkPool {
       super(size);
     }
   }
+
+  @Override
+  public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
+    int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
+    if (newMaxCount != this.maxCount) {
+      // We need an adjustment in the chunks numbers
+      if (newMaxCount > this.maxCount) {
+        // Max chunks getting increased. Just change the variable. Later calls to getChunk()
would
+        // create and add them to Q
+        LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
+        this.maxCount = newMaxCount;
+      } else {
+        // Max chunks getting decreased. We may need to clear off some of the pooled chunks
now
+        // itself. If the extra chunks are serving already, do not pool those when we get
them back
+        LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
+        this.maxCount = newMaxCount;
+        if (this.reclaimedChunks.size() > newMaxCount) {
+          synchronized (this) {
+            while (this.reclaimedChunks.size() > newMaxCount) {
+              this.reclaimedChunks.poll();
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/950d547d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index a426a07..cfbb098 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -203,7 +203,7 @@ public class TestMemStoreChunkPool {
     final int maxCount = 10;
     final int initialCount = 5;
     final int chunkSize = 10;
-    MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
+    MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount,
1);
     assertEquals(initialCount, pool.getPoolSize());
     assertEquals(maxCount, pool.getMaxCount());
     MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we
created.


Mime
View raw message