hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [06/23] hbase git commit: HBASE-17805 We should remove BoundedByteBufferPool because it is replaced by ByteBufferPool
Date Mon, 27 Mar 2017 20:09:04 GMT
HBASE-17805 We should remove BoundedByteBufferPool because it is replaced by ByteBufferPool


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7bb0624b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7bb0624b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7bb0624b

Branch: refs/heads/hbase-12439
Commit: 7bb0624bab68d7dd136d0cd54a8f0c74790aca31
Parents: 9c8f02e
Author: CHIA-PING TSAI <chia7712@gmail.com>
Authored: Mon Mar 20 09:11:53 2017 +0800
Committer: CHIA-PING TSAI <chia7712@gmail.com>
Committed: Tue Mar 21 09:38:02 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/BoundedByteBufferPool.java  | 194 -------------------
 .../hbase/io/TestBoundedByteBufferPool.java     | 167 ----------------
 2 files changed, 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb0624b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
deleted file mode 100644
index 7bce0e5..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
- * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum
size
- * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
- * Hadoop's ElasticByteBuffferPool).
- * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer
go
- * rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
- * we will not add the passed ByteBuffer to the pool; we will just drop it
- * (we will log a WARN in this case that we are at capacity).
- *
- * <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers
tend to
- * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level
on this
- * class for a couple of seconds to get reporting on how it is running when deployed.
- *
- * <p>This pool returns off heap ByteBuffers.
- *
- * <p>This class is thread safe.
- */
-@InterfaceAudience.Private
-public class BoundedByteBufferPool {
-  private static final Log LOG = LogFactory.getLog(BoundedByteBufferPool.class);
-
-  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
-
-  @VisibleForTesting
-  int getQueueSize() {
-    return buffers.size();
-  }
-
-  private final int maxToCache;
-
-  // Maximum size of a ByteBuffer to retain in pool
-  private final int maxByteBufferSizeToCache;
-
-  // A running average only it only rises, it never recedes
-  private final AtomicInteger runningAverageRef;
-
-  @VisibleForTesting
-  int getRunningAverage() {
-    return runningAverageRef.get();
-  }
-
-  // Count (lower 32bit) and total capacity (upper 32bit) of pooled bytebuffers.
-  // Both are non-negative. They are equal to or larger than those of the actual
-  // queued buffers in any transition.
-  private final AtomicLong stateRef = new AtomicLong();
-
-  @VisibleForTesting
-  static int toCountOfBuffers(long state) {
-    return (int)state;
-  }
-
-  @VisibleForTesting
-  static int toTotalCapacity(long state) {
-    return (int)(state >>> 32);
-  }
-
-  @VisibleForTesting
-  static long toState(int countOfBuffers, int totalCapacity) {
-    return ((long)totalCapacity << 32) | countOfBuffers;
-  }
-
-  @VisibleForTesting
-  static long subtractOneBufferFromState(long state, int capacity) {
-    return state - ((long)capacity << 32) - 1;
-  }
-
-  // For reporting, only used in the log
-  private final AtomicLong allocationsRef = new AtomicLong();
-
-  /**
-   * @param maxByteBufferSizeToCache
-   * @param initialByteBufferSize
-   * @param maxToCache
-   */
-  public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
-      final int maxToCache) {
-    this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
-    this.runningAverageRef = new AtomicInteger(initialByteBufferSize);
-    this.maxToCache = maxToCache;
-  }
-
-  public ByteBuffer getBuffer() {
-    ByteBuffer bb = buffers.poll();
-    if (bb != null) {
-      long state;
-      while (true) {
-        long prevState = stateRef.get();
-        state = subtractOneBufferFromState(prevState, bb.capacity());
-        if (stateRef.compareAndSet(prevState, state)) {
-          break;
-        }
-      }
-      // Clear sets limit == capacity. Postion == 0.
-      bb.clear();
-
-      if (LOG.isTraceEnabled()) {
-        int countOfBuffers = toCountOfBuffers(state);
-        int totalCapacity = toTotalCapacity(state);
-        LOG.trace("totalCapacity=" + totalCapacity + ", count=" + countOfBuffers);
-      }
-      return bb;
-    }
-
-    int runningAverage = runningAverageRef.get();
-    bb = ByteBuffer.allocateDirect(runningAverage);
-
-    if (LOG.isTraceEnabled()) {
-      long allocations = allocationsRef.incrementAndGet();
-      LOG.trace("runningAverage=" + runningAverage + ", allocations=" + allocations);
-    }
-    return bb;
-  }
-
-  public void putBuffer(ByteBuffer bb) {
-    // If buffer is larger than we want to keep around, just let it go.
-    if (bb.capacity() > maxByteBufferSizeToCache) {
-      return;
-    }
-
-    int countOfBuffers;
-    int totalCapacity;
-    while (true) {
-      long prevState = stateRef.get();
-      countOfBuffers = toCountOfBuffers(prevState);
-      if (countOfBuffers >= maxToCache) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("At capacity: " + countOfBuffers);
-        }
-        return;
-      }
-      countOfBuffers++;
-      assert 0 < countOfBuffers && countOfBuffers <= maxToCache;
-
-      totalCapacity = toTotalCapacity(prevState) + bb.capacity();
-      if (totalCapacity < 0) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Overflowed total capacity.");
-        }
-        return;
-      }
-
-      long state = toState(countOfBuffers, totalCapacity);
-      if (stateRef.compareAndSet(prevState, state)) {
-        break;
-      }
-    }
-
-    // ConcurrentLinkQueue#offer says "this method will never return false"
-    buffers.offer(bb);
-
-    int runningAverageUpdate = Math.min(
-        totalCapacity / countOfBuffers, // size will never be 0.
-        maxByteBufferSizeToCache);
-    while (true) {
-      int prev = runningAverageRef.get();
-      if (prev >= runningAverageUpdate || // only rises, never recedes
-          runningAverageRef.compareAndSet(prev, runningAverageUpdate)) {
-        break;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb0624b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
deleted file mode 100644
index eca7712..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.subtractOneBufferFromState;
-import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toCountOfBuffers;
-import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toState;
-import static org.apache.hadoop.hbase.io.BoundedByteBufferPool.toTotalCapacity;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ IOTests.class, SmallTests.class })
-public class TestBoundedByteBufferPool {
-  final int maxByteBufferSizeToCache = 10;
-  final int initialByteBufferSize = 1;
-  final int maxToCache = 10;
-  BoundedByteBufferPool reservoir;
-
-  @Before
-  public void before() {
-    this.reservoir =
-      new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
-  }
-
-  @After
-  public void after() {
-    this.reservoir = null;
-  }
-
-  @Test
-  public void testEquivalence() {
-    ByteBuffer bb = ByteBuffer.allocate(1);
-    this.reservoir.putBuffer(bb);
-    this.reservoir.putBuffer(bb);
-    this.reservoir.putBuffer(bb);
-    assertEquals(3, this.reservoir.getQueueSize());
-  }
-
-  @Test
-  public void testGetPut() {
-    ByteBuffer bb = this.reservoir.getBuffer();
-    assertEquals(initialByteBufferSize, bb.capacity());
-    assertEquals(0, this.reservoir.getQueueSize());
-    this.reservoir.putBuffer(bb);
-    assertEquals(1, this.reservoir.getQueueSize());
-    // Now remove a buffer and don't put it back so reservoir is empty.
-    this.reservoir.getBuffer();
-    assertEquals(0, this.reservoir.getQueueSize());
-    // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage
works.
-    // Need to add then remove, then get a new bytebuffer so reservoir internally is doing
-    // allocation
-    final int newCapacity = 2;
-    this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
-    assertEquals(1, reservoir.getQueueSize());
-    this.reservoir.getBuffer();
-    assertEquals(0, this.reservoir.getQueueSize());
-    bb = this.reservoir.getBuffer();
-    assertEquals(newCapacity, bb.capacity());
-    // Assert that adding a too-big buffer won't happen
-    assertEquals(0, this.reservoir.getQueueSize());
-    this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
-    assertEquals(0, this.reservoir.getQueueSize());
-    // Assert we can't add more than max allowed instances.
-    for (int i = 0; i < maxToCache; i++) {
-      this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
-    }
-    assertEquals(maxToCache, this.reservoir.getQueueSize());
-  }
-
-  @Test
-  public void testBufferSizeGrowWithMultiThread() throws Exception {
-    final ConcurrentLinkedDeque<ByteBuffer> bufferQueue = new ConcurrentLinkedDeque<>();
-    int takeBufferThreadsCount = 30;
-    int putBufferThreadsCount = 1;
-    Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount];
-    for (int i = 0; i < takeBufferThreadsCount; i++) {
-      takeBufferThreads[i] = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          while (true) {
-            ByteBuffer buffer = reservoir.getBuffer();
-            try {
-              Thread.sleep(5);
-            } catch (InterruptedException e) {
-              break;
-            }
-            bufferQueue.offer(buffer);
-            if (Thread.currentThread().isInterrupted()) break;
-          }
-        }
-      });
-    }
-
-    Thread putBufferThread[] = new Thread[putBufferThreadsCount];
-    for (int i = 0; i < putBufferThreadsCount; i++) {
-      putBufferThread[i] = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          while (true) {
-            ByteBuffer buffer = bufferQueue.poll();
-            if (buffer != null) {
-              reservoir.putBuffer(buffer);
-            }
-            if (Thread.currentThread().isInterrupted()) break;
-          }
-        }
-      });
-    }
-
-    for (int i = 0; i < takeBufferThreadsCount; i++) {
-      takeBufferThreads[i].start();
-    }
-    for (int i = 0; i < putBufferThreadsCount; i++) {
-      putBufferThread[i].start();
-    }
-    Thread.sleep(2 * 1000);// Let the threads run for 2 secs
-    for (int i = 0; i < takeBufferThreadsCount; i++) {
-      takeBufferThreads[i].interrupt();
-      takeBufferThreads[i].join();
-    }
-    for (int i = 0; i < putBufferThreadsCount; i++) {
-      putBufferThread[i].interrupt();
-      putBufferThread[i].join();
-    }
-    // None of the BBs we got from pool is growing while in use. So we should not change
the
-    // runningAverage in pool
-    assertEquals(initialByteBufferSize, this.reservoir.getRunningAverage());
-  }
-
-  @Test
-  public void testStateConversionMethods() {
-    int countOfBuffers = 123;
-    int totalCapacity = 456;
-
-    long state = toState(countOfBuffers, totalCapacity);
-    assertEquals(countOfBuffers, toCountOfBuffers(state));
-    assertEquals(totalCapacity, toTotalCapacity(state));
-
-    long state2 = subtractOneBufferFromState(state, 7);
-    assertEquals(countOfBuffers - 1, toCountOfBuffers(state2));
-    assertEquals(totalCapacity - 7, toTotalCapacity(state2));
-  }
-}


Mime
View raw message