Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B78218F36 for ; Sun, 22 Nov 2015 04:32:38 +0000 (UTC) Received: (qmail 71703 invoked by uid 500); 22 Nov 2015 04:32:37 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 71660 invoked by uid 500); 22 Nov 2015 04:32:37 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 71651 invoked by uid 99); 22 Nov 2015 04:32:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Nov 2015 04:32:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 707C4DFFC0; Sun, 22 Nov 2015 04:32:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Message-Id: <014866946d034d198915d4d4a76b925f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-14860 Improve BoundedByteBufferPool; make lockless (Hiroshi Ikeda) Date: Sun, 22 Nov 2015 04:32:37 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/master 52edd83ba -> d8f2ac3e6 HBASE-14860 Improve BoundedByteBufferPool; make lockless (Hiroshi Ikeda) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8f2ac3e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8f2ac3e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8f2ac3e Branch: refs/heads/master Commit: d8f2ac3e6ee26fc338f43c0512379a0ff98ea236 Parents: 52edd83 Author: stack Authored: Sat Nov 21 20:32:13 2015 -0800 Committer: stack Committed: Sat Nov 21 20:32:13 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/io/BoundedByteBufferPool.java | 143 +++++++++++++------ .../hbase/io/TestBoundedByteBufferPool.java | 20 +-- 2 files changed, 110 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d8f2ac3e/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index aabbbc6..dfbdc5b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.BoundedArrayQueue; import com.google.common.annotations.VisibleForTesting; @@ -51,23 +51,49 @@ import com.google.common.annotations.VisibleForTesting; public class BoundedByteBufferPool { private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class); + private final Queue buffers = new ConcurrentLinkedQueue(); + @VisibleForTesting - final Queue buffers; + int getQueueSize() { + return buffers.size(); + } + + private final int maxToCache; // Maximum size of a ByteBuffer to retain in pool private final int maxByteBufferSizeToCache; // A running average only it only rises, it never recedes + private final AtomicInteger runningAverageRef; + @VisibleForTesting - volatile int runningAverage; + int getRunningAverage() { + return runningAverageRef.get(); + } + + // Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers. + // Both are non-negative. They are equal to or larger than those of the actual + // queued buffers in any transition. + private final AtomicLong stateRef = new AtomicLong(); + + private static int toCountOfBuffers(long state) { + return (int)state; + } - // Scratch that keeps rough total size of pooled bytebuffers - private volatile int totalReservoirCapacity; + private static int toTotalCapacity(long state) { + return (int)(state >>> 32); + } - // For reporting - private AtomicLong allocations = new AtomicLong(0); + private static long toState(int countOfBuffers, int totalCapacity) { + return ((long)totalCapacity << 32) | totalCapacity; + } - private ReentrantLock lock = new ReentrantLock(); + private static long subtractOneBufferFromState(long state, int capacity) { + return state - ((long)capacity << 32) - 1; + } + + // For reporting, only used in the log + private final AtomicLong allocationsRef = new AtomicLong(); /** * @param maxByteBufferSizeToCache @@ -77,56 +103,87 @@ public class BoundedByteBufferPool { public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, final int maxToCache) { this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; - this.runningAverage = initialByteBufferSize; - this.buffers = new BoundedArrayQueue(maxToCache); + this.runningAverageRef = new AtomicInteger(initialByteBufferSize); + this.maxToCache = maxToCache; } public ByteBuffer getBuffer() { - ByteBuffer bb = null; - lock.lock(); - try { - bb = this.buffers.poll(); - if (bb != null) { - this.totalReservoirCapacity -= bb.capacity(); - } - } finally { - lock.unlock(); - } + ByteBuffer bb = buffers.poll(); if (bb != null) { + long state; + while (true) { + long prevState = stateRef.get(); + state = subtractOneBufferFromState(prevState, bb.capacity()); + if (stateRef.compareAndSet(prevState, state)) { + break; + } + } // Clear sets limit == capacity. Postion == 0. bb.clear(); - } else { - bb = ByteBuffer.allocateDirect(this.runningAverage); - this.allocations.incrementAndGet(); + + if (LOG.isTraceEnabled()) { + int countOfBuffers = toCountOfBuffers(state); + int totalCapacity = toTotalCapacity(state); + LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers); + } + return bb; } + + int runningAverage = runningAverageRef.get(); + bb = ByteBuffer.allocateDirect(runningAverage); + if (LOG.isTraceEnabled()) { - LOG.trace("runningAverage=" + this.runningAverage + - ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() + - ", alloctions=" + this.allocations.get()); + long allocations = allocationsRef.incrementAndGet(); + LOG.trace("runningAverage=" + runningAverage + ", alloctions=" + allocations); } return bb; } public void putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. - if (bb.capacity() > this.maxByteBufferSizeToCache) return; - boolean success = false; - int average = 0; - lock.lock(); - try { - success = this.buffers.offer(bb); - if (success) { - this.totalReservoirCapacity += bb.capacity(); - average = this.totalReservoirCapacity / this.buffers.size(); // size will never be 0. + if (bb.capacity() > maxByteBufferSizeToCache) { + return; + } + + int countOfBuffers; + int totalCapacity; + while (true) { + long prevState = stateRef.get(); + countOfBuffers = toCountOfBuffers(prevState); + if (countOfBuffers >= maxToCache) { + if (LOG.isWarnEnabled()) { + LOG.warn("At capacity: " + countOfBuffers); + } + return; + } + countOfBuffers++; + assert 0 < countOfBuffers && countOfBuffers <= maxToCache; + + totalCapacity = toTotalCapacity(prevState) + bb.capacity(); + if (totalCapacity < 0) { + if (LOG.isWarnEnabled()) { + LOG.warn("Overflowed total capacity."); + } + return; + } + + long state = toState(countOfBuffers, totalCapacity); + if (stateRef.compareAndSet(prevState, state)) { + break; } - } finally { - lock.unlock(); } - if (!success) { - LOG.warn("At capacity: " + this.buffers.size()); - } else { - if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { - this.runningAverage = average; + + // ConcurrentLinkQueue#offer says "this method will never return false" + buffers.offer(bb); + + int runningAverageUpdate = Math.min( + totalCapacity / countOfBuffers, // size will never be 0. + maxByteBufferSizeToCache); + while (true) { + int prev = runningAverageRef.get(); + if (prev >= runningAverageUpdate || // only rises, never recedes + runningAverageRef.compareAndSet(prev, runningAverageUpdate)) { + break; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d8f2ac3e/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java index f993e41..bf4f0a5 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java @@ -53,38 +53,38 @@ public class TestBoundedByteBufferPool { this.reservoir.putBuffer(bb); this.reservoir.putBuffer(bb); this.reservoir.putBuffer(bb); - assertEquals(3, this.reservoir.buffers.size()); + assertEquals(3, this.reservoir.getQueueSize()); } @Test public void testGetPut() { ByteBuffer bb = this.reservoir.getBuffer(); assertEquals(initialByteBufferSize, bb.capacity()); - assertEquals(0, this.reservoir.buffers.size()); + assertEquals(0, this.reservoir.getQueueSize()); this.reservoir.putBuffer(bb); - assertEquals(1, this.reservoir.buffers.size()); + assertEquals(1, this.reservoir.getQueueSize()); // Now remove a buffer and don't put it back so reservoir is empty. this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.buffers.size()); + assertEquals(0, this.reservoir.getQueueSize()); // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works. // Need to add then remove, then get a new bytebuffer so reservoir internally is doing // allocation final int newCapacity = 2; this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity)); - assertEquals(1, reservoir.buffers.size()); + assertEquals(1, reservoir.getQueueSize()); this.reservoir.getBuffer(); - assertEquals(0, this.reservoir.buffers.size()); + assertEquals(0, this.reservoir.getQueueSize()); bb = this.reservoir.getBuffer(); assertEquals(newCapacity, bb.capacity()); // Assert that adding a too-big buffer won't happen - assertEquals(0, this.reservoir.buffers.size()); + assertEquals(0, this.reservoir.getQueueSize()); this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2)); - assertEquals(0, this.reservoir.buffers.size()); + assertEquals(0, this.reservoir.getQueueSize()); // Assert we can't add more than max allowed instances. for (int i = 0; i < maxToCache; i++) { this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize)); } - assertEquals(maxToCache, this.reservoir.buffers.size()); + assertEquals(maxToCache, this.reservoir.getQueueSize()); } @Test @@ -144,6 +144,6 @@ public class TestBoundedByteBufferPool { } // None of the BBs we got from pool is growing while in use. So we should not change the // runningAverage in pool - assertEquals(initialByteBufferSize, this.reservoir.runningAverage); + assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage()); } }