hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-14860 Improve BoundedByteBufferPool; make lockless (Hiroshi Ikeda)
Date Sun, 22 Nov 2015 04:32:37 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 52edd83ba -> d8f2ac3e6


HBASE-14860 Improve BoundedByteBufferPool; make lockless (Hiroshi Ikeda)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8f2ac3e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8f2ac3e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8f2ac3e

Branch: refs/heads/master
Commit: d8f2ac3e6ee26fc338f43c0512379a0ff98ea236
Parents: 52edd83
Author: stack <stack@apache.org>
Authored: Sat Nov 21 20:32:13 2015 -0800
Committer: stack <stack@apache.org>
Committed: Sat Nov 21 20:32:13 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/BoundedByteBufferPool.java  | 143 +++++++++++++------
 .../hbase/io/TestBoundedByteBufferPool.java     |  20 +--
 2 files changed, 110 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d8f2ac3e/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
index aabbbc6..dfbdc5b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
@@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io;
 
 import java.nio.ByteBuffer;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.BoundedArrayQueue;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -51,23 +51,49 @@ import com.google.common.annotations.VisibleForTesting;
 public class BoundedByteBufferPool {
   private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
 
+  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
   @VisibleForTesting
-  final Queue<ByteBuffer> buffers;
+  int getQueueSize() {
+    return buffers.size();
+  }
+
+  private final int maxToCache;
 
   // Maximum size of a ByteBuffer to retain in pool
   private final int maxByteBufferSizeToCache;
 
   // A running average only it only rises, it never recedes
+  private final AtomicInteger runningAverageRef;
+
   @VisibleForTesting
-  volatile int runningAverage;
+  int getRunningAverage() {
+    return runningAverageRef.get();
+  }
+
+  // Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers.
+  // Both are non-negative. They are equal to or larger than those of the actual
+  // queued buffers in any transition.
+  private final AtomicLong stateRef = new AtomicLong();
+
+  private static int toCountOfBuffers(long state) {
+    return (int)state;
+  }
 
-  // Scratch that keeps rough total size of pooled bytebuffers
-  private volatile int totalReservoirCapacity;
+  private static int toTotalCapacity(long state) {
+    return (int)(state >>> 32);
+  }
 
-  // For reporting
-  private AtomicLong allocations = new AtomicLong(0);
+  private static long toState(int countOfBuffers, int totalCapacity) {
+    return ((long)totalCapacity << 32) | totalCapacity;
+  }
 
-  private ReentrantLock lock =  new ReentrantLock();
+  private static long subtractOneBufferFromState(long state, int capacity) {
+    return state - ((long)capacity << 32) - 1;
+  }
+
+  // For reporting, only used in the log
+  private final AtomicLong allocationsRef = new AtomicLong();
 
   /**
    * @param maxByteBufferSizeToCache
@@ -77,56 +103,87 @@ public class BoundedByteBufferPool {
   public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
       final int maxToCache) {
     this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
-    this.runningAverage = initialByteBufferSize;
-    this.buffers = new BoundedArrayQueue<ByteBuffer>(maxToCache);
+    this.runningAverageRef = new AtomicInteger(initialByteBufferSize);
+    this.maxToCache = maxToCache;
   }
 
   public ByteBuffer getBuffer() {
-    ByteBuffer bb = null;
-    lock.lock();
-    try {
-      bb = this.buffers.poll();
-      if (bb != null) {
-        this.totalReservoirCapacity -= bb.capacity();
-      }
-    } finally {
-      lock.unlock();
-    }
+    ByteBuffer bb = buffers.poll();
     if (bb != null) {
+      long state;
+      while (true) {
+        long prevState = stateRef.get();
+        state = subtractOneBufferFromState(prevState, bb.capacity());
+        if (stateRef.compareAndSet(prevState, state)) {
+          break;
+        }
+      }
       // Clear sets limit == capacity. Postion == 0.
       bb.clear();
-    } else {
-      bb = ByteBuffer.allocateDirect(this.runningAverage);
-      this.allocations.incrementAndGet();
+
+      if (LOG.isTraceEnabled()) {
+        int countOfBuffers = toCountOfBuffers(state);
+        int totalCapacity = toTotalCapacity(state);
+        LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers);
+      }
+      return bb;
     }
+
+    int runningAverage = runningAverageRef.get();
+    bb = ByteBuffer.allocateDirect(runningAverage);
+
     if (LOG.isTraceEnabled()) {
-      LOG.trace("runningAverage=" + this.runningAverage +
-        ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size()
+
-        ", alloctions=" + this.allocations.get());
+      long allocations = allocationsRef.incrementAndGet();
+      LOG.trace("runningAverage=" + runningAverage + ", alloctions=" + allocations);
     }
     return bb;
   }
 
   public void putBuffer(ByteBuffer bb) {
     // If buffer is larger than we want to keep around, just let it go.
-    if (bb.capacity() > this.maxByteBufferSizeToCache) return;
-    boolean success = false;
-    int average = 0;
-    lock.lock();
-    try {
-      success = this.buffers.offer(bb);
-      if (success) {
-        this.totalReservoirCapacity += bb.capacity();
-        average = this.totalReservoirCapacity / this.buffers.size(); // size will never be
0.
+    if (bb.capacity() > maxByteBufferSizeToCache) {
+      return;
+    }
+
+    int countOfBuffers;
+    int totalCapacity;
+    while (true) {
+      long prevState = stateRef.get();
+      countOfBuffers = toCountOfBuffers(prevState);
+      if (countOfBuffers >= maxToCache) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("At capacity: " + countOfBuffers);
+        }
+        return;
+      }
+      countOfBuffers++;
+      assert 0 < countOfBuffers && countOfBuffers <= maxToCache;
+
+      totalCapacity = toTotalCapacity(prevState) + bb.capacity();
+      if (totalCapacity < 0) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Overflowed total capacity.");
+        }
+        return;
+      }
+
+      long state = toState(countOfBuffers, totalCapacity);
+      if (stateRef.compareAndSet(prevState, state)) {
+        break;
       }
-    } finally {
-      lock.unlock();
     }
-    if (!success) {
-      LOG.warn("At capacity: " + this.buffers.size());
-    } else {
-      if (average > this.runningAverage && average < this.maxByteBufferSizeToCache)
{
-        this.runningAverage = average;
+
+    // ConcurrentLinkQueue#offer says "this method will never return false"
+    buffers.offer(bb);
+
+    int runningAverageUpdate = Math.min(
+        totalCapacity / countOfBuffers, // size will never be 0.
+        maxByteBufferSizeToCache);
+    while (true) {
+      int prev = runningAverageRef.get();
+      if (prev >= runningAverageUpdate || // only rises, never recedes
+          runningAverageRef.compareAndSet(prev, runningAverageUpdate)) {
+        break;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8f2ac3e/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
index f993e41..bf4f0a5 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
@@ -53,38 +53,38 @@ public class TestBoundedByteBufferPool {
     this.reservoir.putBuffer(bb);
     this.reservoir.putBuffer(bb);
     this.reservoir.putBuffer(bb);
-    assertEquals(3, this.reservoir.buffers.size());
+    assertEquals(3, this.reservoir.getQueueSize());
   }
 
   @Test
   public void testGetPut() {
     ByteBuffer bb = this.reservoir.getBuffer();
     assertEquals(initialByteBufferSize, bb.capacity());
-    assertEquals(0, this.reservoir.buffers.size());
+    assertEquals(0, this.reservoir.getQueueSize());
     this.reservoir.putBuffer(bb);
-    assertEquals(1, this.reservoir.buffers.size());
+    assertEquals(1, this.reservoir.getQueueSize());
     // Now remove a buffer and don't put it back so reservoir is empty.
     this.reservoir.getBuffer();
-    assertEquals(0, this.reservoir.buffers.size());
+    assertEquals(0, this.reservoir.getQueueSize());
     // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage
works.
     // Need to add then remove, then get a new bytebuffer so reservoir internally is doing
     // allocation
     final int newCapacity = 2;
     this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
-    assertEquals(1, reservoir.buffers.size());
+    assertEquals(1, reservoir.getQueueSize());
     this.reservoir.getBuffer();
-    assertEquals(0, this.reservoir.buffers.size());
+    assertEquals(0, this.reservoir.getQueueSize());
     bb = this.reservoir.getBuffer();
     assertEquals(newCapacity, bb.capacity());
     // Assert that adding a too-big buffer won't happen
-    assertEquals(0, this.reservoir.buffers.size());
+    assertEquals(0, this.reservoir.getQueueSize());
     this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
-    assertEquals(0, this.reservoir.buffers.size());
+    assertEquals(0, this.reservoir.getQueueSize());
     // Assert we can't add more than max allowed instances.
     for (int i = 0; i < maxToCache; i++) {
       this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
     }
-    assertEquals(maxToCache, this.reservoir.buffers.size());
+    assertEquals(maxToCache, this.reservoir.getQueueSize());
   }
 
   @Test
@@ -144,6 +144,6 @@ public class TestBoundedByteBufferPool {
     }
     // None of the BBs we got from pool is growing while in use. So we should not change
the
     // runningAverage in pool
-    assertEquals(initialByteBufferSize, this.reservoir.runningAverage);
+    assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage());
   }
 }


Mime
View raw message