hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjus...@apache.org
Subject svn commit: r1461399 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Wed, 27 Mar 2013 03:23:38 GMT
Author: zjushch
Date: Wed Mar 27 03:23:38 2013
New Revision: 1461399

URL: http://svn.apache.org/r1461399
Log:
HBASE-8163 MemStoreChunkPool: An improvement for JAVA GC when using MSLAB

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1461399&r1=1461398&r2=1461399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
Wed Mar 27 03:23:38 2013
@@ -93,7 +93,9 @@ public class MemStore implements HeapSiz
   TimeRangeTracker timeRangeTracker;
   TimeRangeTracker snapshotTimeRangeTracker;
 
-  MemStoreLAB allocator;
+  MemStoreChunkPool chunkPool;
+  volatile MemStoreLAB allocator;
+  volatile MemStoreLAB snapshotAllocator;
 
 
 
@@ -121,9 +123,11 @@ public class MemStore implements HeapSiz
     snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      this.allocator = new MemStoreLAB(conf);
+      this.chunkPool = MemStoreChunkPool.getPool(conf);
+      this.allocator = new MemStoreLAB(conf, chunkPool);
     } else {
       this.allocator = null;
+      this.chunkPool = null;
     }
   }
 
@@ -157,9 +161,10 @@ public class MemStore implements HeapSiz
           this.timeRangeTracker = new TimeRangeTracker();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
+          this.snapshotAllocator = this.allocator;
           // Reset allocator so we get a fresh buffer for the new memstore
           if (allocator != null) {
-            this.allocator = new MemStoreLAB(conf);
+            this.allocator = new MemStoreLAB(conf, chunkPool);
           }
         }
       }
@@ -188,6 +193,7 @@ public class MemStore implements HeapSiz
    */
   void clearSnapshot(final SortedSet<KeyValue> ss)
   throws UnexpectedException {
+    MemStoreLAB tmpAllocator = null;
     this.lock.writeLock().lock();
     try {
       if (this.snapshot != ss) {
@@ -200,9 +206,16 @@ public class MemStore implements HeapSiz
         this.snapshot = new KeyValueSkipListSet(this.comparator);
         this.snapshotTimeRangeTracker = new TimeRangeTracker();
       }
+      if (this.snapshotAllocator != null) {
+        tmpAllocator = this.snapshotAllocator;
+        this.snapshotAllocator = null;
+      }
     } finally {
       this.lock.writeLock().unlock();
     }
+    if (tmpAllocator != null) {
+      tmpAllocator.close();
+    }
   }
 
   /**
@@ -697,6 +710,10 @@ public class MemStore implements HeapSiz
     // the pre-calculated KeyValue to be returned by peek() or next()
     private KeyValue theNext;
 
+    // The allocator and snapshot allocator at the time of creating this scanner
+    volatile MemStoreLAB allocatorAtCreation;
+    volatile MemStoreLAB snapshotAllocatorAtCreation;
+
     /*
     Some notes...
 
@@ -723,6 +740,14 @@ public class MemStore implements HeapSiz
 
       kvsetAtCreation = kvset;
       snapshotAtCreation = snapshot;
+      if (allocator != null) {
+        this.allocatorAtCreation = allocator;
+        this.allocatorAtCreation.incScannerCount();
+      }
+      if (snapshotAllocator != null) {
+        this.snapshotAllocatorAtCreation = snapshotAllocator;
+        this.snapshotAllocatorAtCreation.incScannerCount();
+      }
     }
 
     private KeyValue getNext(Iterator<KeyValue> it) {
@@ -885,6 +910,15 @@ public class MemStore implements HeapSiz
 
       this.kvsetIt = null;
       this.snapshotIt = null;
+      
+      if (allocatorAtCreation != null) {
+        this.allocatorAtCreation.decScannerCount();
+        this.allocatorAtCreation = null;
+      }
+      if (snapshotAllocatorAtCreation != null) {
+        this.snapshotAllocatorAtCreation.decScannerCount();
+        this.snapshotAllocatorAtCreation = null;
+      }
 
       this.kvsetItRow = null;
       this.snapshotItRow = null;
@@ -907,7 +941,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (11 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (13 * ClassSize.REFERENCE));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java?rev=1461399&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
(added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
Wed Mar 27 03:23:38 2013
@@ -0,0 +1,219 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A pool of {@link MemStoreLAB$Chunk} instances.
+ * 
+ * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+ * decrease allocating bytes when writing, thereby optimizing the garbage
+ * collection on JVM.
+ * 
+ * The pool instance is globally unique and could be obtained through
+ * {@link MemStoreChunkPool#getPool(Configuration)}
+ * 
+ * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
+ * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
+ * when MemStore clearing snapshot for flush
+ */
+@InterfaceAudience.Private
+public class MemStoreChunkPool {
+  private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
+  final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
+  final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
+  final static float POOL_MAX_SIZE_DEFAULT = 0.0f;
+  final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
+
+  // Static reference to the MemStoreChunkPool
+  private static MemStoreChunkPool globalInstance;
+  /** Boolean whether we have disabled the memstore chunk pool entirely. */
+  static boolean chunkPoolDisabled = false;
+
+  private final int maxCount;
+
+  // A queue of reclaimed chunks
+  private final BlockingQueue<Chunk> reclaimedChunks;
+  private final int chunkSize;
+
+  /** Statistics thread schedule pool */
+  private final ScheduledExecutorService scheduleThreadPool;
+  /** Statistics thread */
+  private static final int statThreadPeriod = 60 * 5;
+  private AtomicLong createdChunkCount = new AtomicLong();
+  private AtomicLong reusedChunkCount = new AtomicLong();
+
+  MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
+      int initialCount) {
+    this.maxCount = maxCount;
+    this.chunkSize = chunkSize;
+    this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
+    for (int i = 0; i < initialCount; i++) {
+      Chunk chunk = new Chunk(chunkSize);
+      chunk.init();
+      reclaimedChunks.add(chunk);
+    }
+    final String n = Thread.currentThread().getName();
+    scheduleThreadPool = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
+            .setDaemon(true).build());
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
+        statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Poll a chunk from the pool, reset it if not null, else create a new chunk
+   * to return
+   * @return a chunk
+   */
+  Chunk getChunk() {
+    Chunk chunk = reclaimedChunks.poll();
+    if (chunk == null) {
+      chunk = new Chunk(chunkSize);
+      createdChunkCount.incrementAndGet();
+    } else {
+      chunk.reset();
+      reusedChunkCount.incrementAndGet();
+    }
+    return chunk;
+  }
+
+  /**
+   * Add the chunks to the pool, when the pool achieves the max size, it will
+   * skip the remaining chunks
+   * @param chunks
+   */
+  void putbackChunks(BlockingQueue<Chunk> chunks) {
+    int maxNumToPutback = this.maxCount - reclaimedChunks.size();
+    if (maxNumToPutback <= 0) {
+      return;
+    }
+    chunks.drainTo(reclaimedChunks, maxNumToPutback);
+  }
+
+  /**
+   * Add the chunk to the pool, if the pool has achieved the max size, it will
+   * skip it
+   * @param chunk
+   */
+  void putbackChunk(Chunk chunk) {
+    if (reclaimedChunks.size() >= this.maxCount) {
+      return;
+    }
+    reclaimedChunks.add(chunk);
+  }
+
+  int getPoolSize() {
+    return this.reclaimedChunks.size();
+  }
+
+  /*
+   * Only used in testing
+   */
+  void clearChunks() {
+    this.reclaimedChunks.clear();
+  }
+
+  private static class StatisticsThread extends Thread {
+    MemStoreChunkPool mcp;
+
+    public StatisticsThread(MemStoreChunkPool mcp) {
+      super("MemStoreChunkPool.StatisticsThread");
+      setDaemon(true);
+      this.mcp = mcp;
+    }
+
+    @Override
+    public void run() {
+      mcp.logStats();
+    }
+  }
+
+  private void logStats() {
+    if (!LOG.isDebugEnabled()) return;
+    long created = createdChunkCount.get();
+    long reused = reusedChunkCount.get();
+    long total = created + reused;
+    LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+        + ",created chunk count=" + created
+        + ",reused chunk count=" + reused
+        + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+            (float) reused / (float) total, 2)));
+  }
+
+  /**
+   * @param conf
+   * @return the global MemStoreChunkPool instance
+   */
+  static synchronized MemStoreChunkPool getPool(Configuration conf) {
+    if (globalInstance != null) return globalInstance;
+    if (chunkPoolDisabled) return null;
+
+
+    float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY,
+        POOL_MAX_SIZE_DEFAULT);
+    if (poolSizePercentage <= 0) {
+      chunkPoolDisabled = true;
+      return null;
+    }
+    if (poolSizePercentage > 1.0) {
+      throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY
+          + " must be between 0.0 and 1.0");
+    }
+    long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+        .getMax();
+    long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
+        MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
+    int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
+        MemStoreLAB.CHUNK_SIZE_DEFAULT);
+    int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
+
+    float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
+        POOL_INITIAL_SIZE_DEFAULT);
+    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+      throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
+          + " must be between 0.0 and 1.0");
+    }
+
+    int initialCount = (int) (initialCountPercentage * maxCount);
+    LOG.info("Allocating MemStoreChunkPool with chunk size "
+        + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount
+        + ", initial count " + initialCount);
+    globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount,
+        initialCount);
+    return globalInstance;
+  }
+
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1461399&r1=1461398&r2=1461399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Wed Mar 27 03:23:38 2013
@@ -79,9 +79,9 @@ class MemStoreFlusher implements FlushRe
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
 
-  private static final float DEFAULT_UPPER = 0.4f;
+  static final float DEFAULT_UPPER = 0.4f;
   private static final float DEFAULT_LOWER = 0.35f;
-  private static final String UPPER_KEY =
+  static final String UPPER_KEY =
     "hbase.regionserver.global.memstore.upperLimit";
   private static final String LOWER_KEY =
     "hbase.regionserver.global.memstore.lowerLimit";

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java?rev=1461399&r1=1461398&r2=1461399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
Wed Mar 27 03:23:38 2013
@@ -18,11 +18,15 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -49,6 +53,8 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.Private
 public class MemStoreLAB {
   private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+  // A queue of chunks contained by this memstore
+  private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
 
   final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
   final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
@@ -58,13 +64,30 @@ public class MemStoreLAB {
   final static int MAX_ALLOC_DEFAULT = 256  * 1024; // allocs bigger than this don't go through
allocator
   final int maxAlloc;
 
+  private final MemStoreChunkPool chunkPool;
+
+  // This flag is for closing this instance, its set when clearing snapshot of
+  // memstore
+  private volatile boolean closed = false;
+  // This flag is for reclaiming chunks. Its set when putting chunks back to
+  // pool
+  private AtomicBoolean reclaimed = new AtomicBoolean(false);
+  // Current count of open scanners which reading data from this MemStoreLAB
+  private final AtomicInteger openScannerCount = new AtomicInteger();
+
+  // Used in testing
   public MemStoreLAB() {
     this(new Configuration());
   }
 
-  public MemStoreLAB(Configuration conf) {
+  private MemStoreLAB(Configuration conf) {
+    this(conf, MemStoreChunkPool.getPool(conf));
+  }
+
+  public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
     chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
     maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
+    this.chunkPool = pool;
 
     // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
     Preconditions.checkArgument(
@@ -105,6 +128,38 @@ public class MemStoreLAB {
   }
 
   /**
+   * Close this instance since it won't be used any more, try to put the chunks
+   * back to pool
+   */
+  void close() {
+    this.closed = true;
+    // We could put back the chunks to pool for reusing only when there is no
+    // opening scanner which will read their data
+    if (chunkPool != null && openScannerCount.get() == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+  }
+
+  /**
+   * Called when opening a scanner on the data of this MemStoreLAB
+   */
+  void incScannerCount() {
+    this.openScannerCount.incrementAndGet();
+  }
+
+  /**
+   * Called when closing a scanner on the data of this MemStoreLAB
+   */
+  void decScannerCount() {
+    int count = this.openScannerCount.decrementAndGet();
+    if (chunkPool != null && count == 0 && this.closed
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+  }
+
+  /**
    * Try to retire the current chunk if it is still
    * <code>c</code>. Postcondition is that curChunk.get()
    * != c
@@ -134,12 +189,15 @@ public class MemStoreLAB {
       // No current chunk, so we want to allocate one. We race
       // against other allocators to CAS in an uninitialized chunk
       // (which is cheap to allocate)
-      c = new Chunk(chunkSize);
+      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
       if (curChunk.compareAndSet(null, c)) {
         // we won race - now we need to actually do the expensive
         // allocation step
         c.init();
+        this.chunkQueue.add(c);
         return c;
+      } else if (chunkPool != null) {
+        chunkPool.putbackChunk(c);
       }
       // someone else won race - that's fine, we'll try to grab theirs
       // in the next iteration of the loop.
@@ -149,7 +207,7 @@ public class MemStoreLAB {
   /**
    * A chunk of memory out of which allocations are sliced.
    */
-  private static class Chunk {
+  static class Chunk {
     /** Actual underlying data */
     private byte[] data;
 
@@ -172,7 +230,7 @@ public class MemStoreLAB {
      * this is cheap.
      * @param size in bytes
      */
-    private Chunk(int size) {
+    Chunk(int size) {
       this.size = size;
     }
 
@@ -184,7 +242,9 @@ public class MemStoreLAB {
     public void init() {
       assert nextFreeOffset.get() == UNINITIALIZED;
       try {
-        data = new byte[size];
+        if (data == null) {
+          data = new byte[size];
+        }
       } catch (OutOfMemoryError e) {
         boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
         assert failInit; // should be true.
@@ -200,6 +260,16 @@ public class MemStoreLAB {
     }
 
     /**
+     * Reset the offset to UNINITIALIZED before before reusing an old chunk
+     */
+    void reset() {
+      if (nextFreeOffset.get() != UNINITIALIZED) {
+        nextFreeOffset.set(UNINITIALIZED);
+        allocCount.set(0);
+      }
+    }
+
+    /**
      * Try to allocate <code>size</code> bytes from the chunk.
      * @return the offset of the successful allocation, or -1 to indicate not-enough-space
      */

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java?rev=1461399&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
(added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
Wed Mar 27 03:23:38 2013
@@ -0,0 +1,201 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.rmi.UnexpectedException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the {@link MemStoreChunkPool} class
+ */
+@Category(SmallTests.class)
+public class TestMemStoreChunkPool {
+  private final static Configuration conf = new Configuration();
+  private static MemStoreChunkPool chunkPool;
+  private static boolean chunkPoolDisabledBeforeTest;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf.setBoolean(MemStore.USEMSLAB_KEY, true);
+    conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+    chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
+    MemStoreChunkPool.chunkPoolDisabled = false;
+    chunkPool = MemStoreChunkPool.getPool(conf);
+    assertTrue(chunkPool != null);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
+  }
+
+  @Before
+  public void tearDown() throws Exception {
+    chunkPool.clearChunks();
+  }
+
+  @Test
+  public void testReusingChunks() {
+    Random rand = new Random();
+    MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool);
+    int expectedOff = 0;
+    byte[] lastBuffer = null;
+    // Randomly allocate some bytes
+    for (int i = 0; i < 100; i++) {
+      int size = rand.nextInt(1000);
+      Allocation alloc = mslab.allocateBytes(size);
+
+      if (alloc.getData() != lastBuffer) {
+        expectedOff = 0;
+        lastBuffer = alloc.getData();
+      }
+      assertEquals(expectedOff, alloc.getOffset());
+      assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset()
+          + size <= alloc.getData().length);
+      expectedOff += size;
+    }
+    // chunks will be put back to pool after close
+    mslab.close();
+    int chunkCount = chunkPool.getPoolSize();
+    assertTrue(chunkCount > 0);
+    // reconstruct mslab
+    mslab = new MemStoreLAB(conf, chunkPool);
+    // chunk should be got from the pool, so we can reuse it.
+    mslab.allocateBytes(1000);
+    assertEquals(chunkCount - 1, chunkPool.getPoolSize());
+  }
+
+  @Test
+  public void testPuttingBackChunksAfterFlushing() throws UnexpectedException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] val = Bytes.toBytes("testval");
+
+    MemStore memstore = new MemStore();
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
+
+    // Creating a snapshot
+    memstore.snapshot();
+    KeyValueSkipListSet snapshot = memstore.getSnapshot();
+    assertEquals(3, memstore.snapshot.size());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.kvset.size());
+    memstore.add(new KeyValue(row, fam, qf4, val));
+    memstore.add(new KeyValue(row, fam, qf5, val));
+    assertEquals(2, memstore.kvset.size());
+    memstore.clearSnapshot(snapshot);
+
+    int chunkCount = chunkPool.getPoolSize();
+    assertTrue(chunkCount > 0);
+
+  }
+
+  @Test
+  public void testPuttingBackChunksWithOpeningScanner()
+      throws UnexpectedException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] qf6 = Bytes.toBytes("testqualifier6");
+    byte[] qf7 = Bytes.toBytes("testqualifier7");
+    byte[] val = Bytes.toBytes("testval");
+
+    MemStore memstore = new MemStore();
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
+
+    // Creating a snapshot
+    memstore.snapshot();
+    KeyValueSkipListSet snapshot = memstore.getSnapshot();
+    assertEquals(3, memstore.snapshot.size());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.kvset.size());
+    memstore.add(new KeyValue(row, fam, qf4, val));
+    memstore.add(new KeyValue(row, fam, qf5, val));
+    assertEquals(2, memstore.kvset.size());
+
+    // opening scanner before clear the snapshot
+    List<KeyValueScanner> scanners = memstore.getScanners();
+    // Shouldn't putting back the chunks to pool,since some scanners are opening
+    // based on their data
+    memstore.clearSnapshot(snapshot);
+
+    assertTrue(chunkPool.getPoolSize() == 0);
+
+    // Chunks will be put back to pool after close scanners;
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    assertTrue(chunkPool.getPoolSize() > 0);
+
+    // clear chunks
+    chunkPool.clearChunks();
+
+    // Creating another snapshot
+    memstore.snapshot();
+    snapshot = memstore.getSnapshot();
+    // Adding more value
+    memstore.add(new KeyValue(row, fam, qf6, val));
+    memstore.add(new KeyValue(row, fam, qf7, val));
+    // opening scanners
+    scanners = memstore.getScanners();
+    // close scanners before clear the snapshot
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    // Since no opening scanner, the chunks of snapshot should be put back to
+    // pool
+    memstore.clearSnapshot(snapshot);
+    assertTrue(chunkPool.getPoolSize() > 0);
+  }
+
+}
\ No newline at end of file



Mime
View raw message