hbase-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stack <st...@apache.org>
Subject Re: [1/2] hbase git commit: HBASE-13142 [PERF] Reuse the IPCUtil#buildCellBlock buffer Rename ByteBufferReservoir as BoundedByteBufferPool
Date Thu, 05 Mar 2015 06:59:08 GMT
Mistaken commit. Reverted.

On Wed, Mar 4, 2015 at 10:56 PM, <stack@apache.org> wrote:

> Repository: hbase
> Updated Branches:
>   refs/heads/master 5bd27af8b -> d259bd402
>
>
>     HBASE-13142 [PERF] Reuse the IPCUtil#buildCellBlock buffer
>     Rename ByteBufferReservoir as BoundedByteBufferPool
>
>
> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
> Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/55f8f56a
> Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/55f8f56a
> Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/55f8f56a
>
> Branch: refs/heads/master
> Commit: 55f8f56ad28f71a893acb1e5993689499134a018
> Parents: 5bd27af
> Author: stack <stack@apache.org>
> Authored: Wed Mar 4 22:44:20 2015 -0800
> Committer: stack <stack@apache.org>
> Committed: Wed Mar 4 22:44:20 2015 -0800
>
> ----------------------------------------------------------------------
>  .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |  48 ++++++--
>  .../hadoop/hbase/io/BoundedByteBufferPool.java  | 118 +++++++++++++++++++
>  .../hadoop/hbase/io/ByteBufferOutputStream.java |  32 +++--
>  .../hadoop/hbase/io/TestByteBufferResevoir.java | 107 +++++++++++++++++
>  .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  33 +++++-
>  .../hbase/io/TestByteBufferOutputStream.java    |  46 ++++++++
>  6 files changed, 356 insertions(+), 28 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
> b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
> index b7e7728..63c2143 100644
> --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
> +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
> @@ -65,6 +65,7 @@ public class IPCUtil {
>      this.conf = conf;
>      this.cellBlockDecompressionMultiplier =
>
>  conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
> +
>      // Guess that 16k is a good size for rpc buffer.  Could go bigger.
> See the TODO below in
>      // #buildCellBlock.
>      this.cellBlockBuildingInitialBufferSize =
> @@ -91,23 +92,44 @@ public class IPCUtil {
>    public ByteBuffer buildCellBlock(final Codec codec, final
> CompressionCodec compressor,
>      final CellScanner cellScanner)
>    throws IOException {
> +    return buildCellBlock(codec, compressor, cellScanner, null);
> +  }
> +
> +  /**
> +   * Puts CellScanner Cells into a cell block using passed in
> <code>codec</code> and/or
> +   * <code>compressor</code>.
> +   * @param codec
> +   * @param compressor
> +   * @param cellScanner
> +   * @return Null or byte buffer filled with a cellblock filled with
> passed-in Cells encoded using
> +   * passed in <code>codec</code> and/or <code>compressor</code>;
the
> returned buffer has been
> +   * flipped and is ready for reading.  Use limit to find total size.
> +   * @param bb Use this bb. Can be null if no reuse going on.
> +   * @throws IOException
> +   */
> +  @SuppressWarnings("resource")
> +  public ByteBuffer buildCellBlock(final Codec codec, final
> CompressionCodec compressor,
> +    final CellScanner cellScanner, final ByteBuffer bb)
> +  throws IOException {
>      if (cellScanner == null) return null;
>      if (codec == null) throw new CellScannerButNoCodecException();
>      int bufferSize = this.cellBlockBuildingInitialBufferSize;
> -    if (cellScanner instanceof HeapSize) {
> -      long longSize = ((HeapSize)cellScanner).heapSize();
> -      // Just make sure we don't have a size bigger than an int.
> -      if (longSize > Integer.MAX_VALUE) {
> -        throw new IOException("Size " + longSize + " > " +
> Integer.MAX_VALUE);
> +    ByteBufferOutputStream baos = null;
> +    if (bb != null) {
> +      bufferSize = bb.capacity();
> +      baos = new ByteBufferOutputStream(bb);
> +    } else {
> +      // Then we need to make our own to return.
> +      if (cellScanner instanceof HeapSize) {
> +        long longSize = ((HeapSize)cellScanner).heapSize();
> +        // Just make sure we don't have a size bigger than an int.
> +        if (longSize > Integer.MAX_VALUE) {
> +          throw new IOException("Size " + longSize + " > " +
> Integer.MAX_VALUE);
> +        }
> +        bufferSize = ClassSize.align((int)longSize);
>        }
> -      bufferSize = ClassSize.align((int)longSize);
> -    } // TODO: Else, get estimate on size of buffer rather than have the
> buffer resize.
> -    // See TestIPCUtil main for experiment where we spin through the
> Cells getting estimate of
> -    // total size before creating the buffer.  It costs somw small
> percentage.  If we are usually
> -    // within the estimated buffer size, then the cost is not worth it.
> If we are often well
> -    // outside the guesstimated buffer size, the processing can be done
> in half the time if we
> -    // go w/ the estimated size rather than let the buffer resize.
> -    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
> +      baos = new ByteBufferOutputStream(bufferSize);
> +    }
>      OutputStream os = baos;
>      Compressor poolCompressor = null;
>      try {
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
> b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
> new file mode 100644
> index 0000000..1ed7db0
> --- /dev/null
> +++
> b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
> @@ -0,0 +1,118 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hadoop.hbase.io;
> +
> +import java.nio.ByteBuffer;
> +import java.util.NavigableMap;
> +import java.util.TreeMap;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.hbase.classification.InterfaceAudience;
> +
> +import com.google.common.annotations.VisibleForTesting;
> +
> +/**
> + * Like Hadoops' ByteBufferPool only you do not specify desired size when
> getting a ByteBuffer. We
> + * also keep upper bounds on ByteBuffer size and amount of ByteBuffers we
> keep int the pool hence
> + * it is 'bounded' as opposed to 'elastic' as in ElasticByteBuffferPool
> If a ByteBuffer is bigger
> + * than a threshold, we will just let the ByteBuffer go rather than keep
> it around. If more
> + * ByteBuffers than configured maximum instances, then we do not cache
> either (we will log a
> + * WARN in this case).
> + *
> + * <p>The intended use case is a reservoir of bytebuffers that an RPC can
> reuse; buffers tend to
> + * achieve a particular 'run' size over time give or take a few extremes.
> + *
> + * <p>Thread safe.
> + */
> +@InterfaceAudience.Private
> +public class BoundedByteBufferPool {
> +  private final Log LOG = LogFactory.getLog(this.getClass());
> +
> +  private final class Key implements Comparable<Key> {
> +    private final int capacity;
> +
> +    Key(final int capacity) {
> +      this.capacity = capacity;
> +    }
> +
> +    @Override
> +    public int compareTo(Key that) {
> +      if (this.capacity < that.capacity) return -1;
> +      if (this.capacity > that.capacity) return 1;
> +      return this.hashCode() - that.hashCode();
> +    }
> +  }
> +
> +  @VisibleForTesting
> +  final NavigableMap<Key, ByteBuffer> buffers = new TreeMap<Key,
> ByteBuffer>();
> +
> +  private final int maxByteBufferSizeToCache;
> +  private final int maxToCache;
> +  // A running average only it just rises, never recedes
> +  private int runningAverage;
> +  private int totalReservoirCapacity;
> +
> +  /**
> +   * @param maxByteBufferSizeToCache
> +   * @param initialByteBufferSize
> +   * @param maxToCache
> +   */
> +  public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final
> int initialByteBufferSize,
> +      final int maxToCache) {
> +    this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
> +    this.runningAverage = initialByteBufferSize;
> +    this.maxToCache = maxToCache;
> +  }
> +
> +  public synchronized ByteBuffer getBuffer() {
> +    Key key = this.buffers.isEmpty()? null: this.buffers.firstKey();
> +    ByteBuffer bb = null;
> +    if (key == null) {
> +      bb = ByteBuffer.allocate(this.runningAverage);
> +    } else {
> +      bb = this.buffers.remove(key);
> +      if (bb ==  null) throw new IllegalStateException();
> +      bb.clear();
> +      this.totalReservoirCapacity -= bb.capacity();
> +    }
> +    if (LOG.isTraceEnabled()) {
> +      LOG.trace("runningAverage=" + this.runningAverage +
> +        ", totalCapacity=" + this.totalReservoirCapacity + ", count=" +
> this.buffers.size());
> +    }
> +    return bb;
> +  }
> +
> +  public synchronized void putBuffer(ByteBuffer buffer) {
> +    // If buffer is larger than we want to keep around, just let it go.
> +    if (buffer.capacity() > this.maxByteBufferSizeToCache) return;
> +    // futureSize is how many byte buffers the reservoir will have if
> this method succeeds.
> +    int futureSize = this.buffers.size() + 1;
> +    if (futureSize > this.maxToCache) {
> +      // If at max size, something is wrong. WARN.
> +      if (LOG.isWarnEnabled()) LOG.warn("At capacity: " + futureSize);
> +      return;
> +    }
> +    this.totalReservoirCapacity += buffer.capacity();
> +    int average = this.totalReservoirCapacity / futureSize;
> +    if (average > this.runningAverage && average <
> this.maxByteBufferSizeToCache) {
> +      this.runningAverage = average;
> +    }
> +    this.buffers.put(new Key(buffer.capacity()), buffer);
> +  }
> +}
> \ No newline at end of file
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
> b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
> index 257b850..eee5866 100644
> ---
> a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
> +++
> b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
> @@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends
> OutputStream {
>    }
>
>    public ByteBufferOutputStream(int capacity, boolean
> useDirectByteBuffer) {
> -    if (useDirectByteBuffer) {
> -      buf = ByteBuffer.allocateDirect(capacity);
> -    } else {
> -      buf = ByteBuffer.allocate(capacity);
> -    }
> +    this(allocate(capacity, useDirectByteBuffer));
> +  }
> +
> +  /**
> +   * @param bb ByteBuffer to use. If too small, will be discarded and a
> new one allocated in its
> +   * place; i.e. the passed in BB may be DESTROYED!!! Minimally it will
> be altered. If you want
> +   * to obtain the newly allocated ByteBuffer, you'll need to pick it up
> when
> +   * done with this instance by calling {@link #getByteBuffer()}. All
> this encapsulation violation
> +   * is so we can recycle buffers rather than allocate each time; it can
> get expensive especially
> +   * if the buffers are big doing allocations each time or having them
> undergo resizing because
> +   * initial allocation was small.
> +   * @see #getByteBuffer()
> +   */
> +  public ByteBufferOutputStream(final ByteBuffer bb) {
> +    this.buf = bb;
> +    this.buf.clear();
>    }
>
>    public int size() {
>      return buf.position();
>    }
>
> +  private static ByteBuffer allocate(final int capacity, final boolean
> useDirectByteBuffer) {
> +    return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity):
> ByteBuffer.allocate(capacity);
> +  }
> +
>    /**
>     * This flips the underlying BB so be sure to use it _last_!
>     * @return ByteBuffer
> @@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends
> OutputStream {
>        int newSize = (int)Math.min((((long)buf.capacity()) * 2),
>            (long)(Integer.MAX_VALUE));
>        newSize = Math.max(newSize, buf.position() + extra);
> -      ByteBuffer newBuf = null;
> -      if (buf.isDirect()) {
> -        newBuf = ByteBuffer.allocateDirect(newSize);
> -      } else {
> -        newBuf = ByteBuffer.allocate(newSize);
> -      }
> +      ByteBuffer newBuf = allocate(newSize, buf.isDirect());
>        buf.flip();
>        newBuf.put(buf);
>        buf = newBuf;
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java
> b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java
> new file mode 100644
> index 0000000..c847813
> --- /dev/null
> +++
> b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferResevoir.java
> @@ -0,0 +1,107 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hadoop.hbase.io;
> +
> +import static org.junit.Assert.*;
> +
> +import java.nio.ByteBuffer;
> +
> +import org.apache.hadoop.hbase.testclassification.IOTests;
> +import org.apache.hadoop.hbase.testclassification.SmallTests;
> +import org.junit.After;
> +import org.junit.Before;
> +import org.junit.Test;
> +import org.junit.experimental.categories.Category;
> +
> +@Category({ IOTests.class, SmallTests.class })
> +public class TestByteBufferResevoir {
> +  final int maxByteBufferSizeToCache = 10;
> +  final int initialByteBufferSize = 1;
> +  final int maxToCache = 10;
> +  BoundedByteBufferPool reservoir;
> +
> +  @Before
> +  public void before() {
> +    this.reservoir =
> +      new BoundedByteBufferPool(maxByteBufferSizeToCache,
> initialByteBufferSize, maxToCache);
> +  }
> +
> +  @After
> +  public void after() {
> +    this.reservoir = null;
> +  }
> +
> +  @Test
> +  public void testGetPut() {
> +    ByteBuffer bb = this.reservoir.getBuffer();
> +    assertEquals(initialByteBufferSize, bb.capacity());
> +    assertEquals(0, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(bb);
> +    assertEquals(1, this.reservoir.buffers.size());
> +    // Now remove a buffer and don't put it back so reservoir is empty.
> +    this.reservoir.getBuffer();
> +    assertEquals(0, this.reservoir.buffers.size());
> +    // Try adding in a buffer with a bigger-than-initial size and see if
> our runningAverage works.
> +    // Need to add then remove, then get a new bytebuffer so reservoir
> internally is doing
> +    // allocation
> +    final int newCapacity = 2;
> +    this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
> +    assertEquals(1, reservoir.buffers.size());
> +    this.reservoir.getBuffer();
> +    assertEquals(0, this.reservoir.buffers.size());
> +    bb = this.reservoir.getBuffer();
> +    assertEquals(newCapacity, bb.capacity());
> +    // Assert that adding a too-big buffer won't happen
> +    assertEquals(0, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache
> * 2));
> +    assertEquals(0, this.reservoir.buffers.size());
> +    // Assert we can't add more than max allowed instances.
> +    for (int i = 0; i < maxToCache; i++) {
> +
> this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
> +    }
> +    assertEquals(maxToCache, this.reservoir.buffers.size());
> +  }
> +
> +  @Test
> +  public void testComesOutSmallestFirst() {
> +    // Put in bbs that are sized 1-5 in random order. Put in a few of
> size 2 and make sure they
> +    // each come out too.
> +    this.reservoir.putBuffer(ByteBuffer.allocate(5));
> +    assertEquals(1, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(2));
> +    assertEquals(2, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(2));
> +    assertEquals(3, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(3));
> +    assertEquals(4, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(1));
> +    assertEquals(5, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(2));
> +    assertEquals(6, this.reservoir.buffers.size());
> +    this.reservoir.putBuffer(ByteBuffer.allocate(4));
> +    assertEquals(7, this.reservoir.buffers.size());
> +    // Now get them out and they should come out smallest first.
> +    assertEquals(1, this.reservoir.getBuffer().capacity());
> +    assertEquals(2, this.reservoir.getBuffer().capacity());
> +    assertEquals(2, this.reservoir.getBuffer().capacity());
> +    assertEquals(2, this.reservoir.getBuffer().capacity());
> +    assertEquals(3, this.reservoir.getBuffer().capacity());
> +    assertEquals(4, this.reservoir.getBuffer().capacity());
> +    assertEquals(5, this.reservoir.getBuffer().capacity());
> +  }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
> b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
> index 064771c..13bd7b7 100644
> --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
> +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
> @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Operation;
>  import org.apache.hadoop.hbase.codec.Codec;
>  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
>  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
> +import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
>  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
>  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
>  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
> @@ -267,6 +268,9 @@ public class RpcServer implements RpcServerInterface {
>
>    private UserProvider userProvider;
>
> +  private final BoundedByteBufferPool reservoir;
> +
> +
>    /**
>     * Datastructure that holds all necessary to a method invocation and
> then afterward, carries
>     * the result.
> @@ -293,6 +297,7 @@ public class RpcServer implements RpcServerInterface {
>      protected long size;                          // size of current call
>      protected boolean isError;
>      protected TraceInfo tinfo;
> +    private ByteBuffer recycledByteBuffer = null;
>
>      Call(int id, final BlockingService service, final MethodDescriptor
> md, RequestHeader header,
>           Message param, CellScanner cellScanner, Connection connection,
> Responder responder,
> @@ -313,6 +318,19 @@ public class RpcServer implements RpcServerInterface {
>        this.tinfo = tinfo;
>      }
>
> +    /**
> +     * Call is done. Execution happened and we returned results to
> client. It is now safe to
> +     * cleanup.
> +     */
> +    void done() {
> +      if (this.recycledByteBuffer != null) {
> +        // Return buffer to reservoir now we are done with it.
> +        reservoir.putBuffer(this.recycledByteBuffer);
> +        this.recycledByteBuffer = null;
> +      }
> +      this.connection.decRpcCount();  // Say that we're done with this
> call.
> +    }
> +
>      @Override
>      public String toString() {
>        return toShortString() + " param: " +
> @@ -375,8 +393,9 @@ public class RpcServer implements RpcServerInterface {
>            // Set the exception as the result of the method invocation.
>            headerBuilder.setException(exceptionBuilder.build());
>          }
> -        ByteBuffer cellBlock =
> -          ipcUtil.buildCellBlock(this.connection.codec,
> this.connection.compressionCodec, cells);
> +        this.recycledByteBuffer = reservoir.getBuffer();
> +        ByteBuffer cellBlock =
> ipcUtil.buildCellBlock(this.connection.codec,
> +          this.connection.compressionCodec, cells, recycledByteBuffer);
>          if (cellBlock != null) {
>            CellBlockMeta.Builder cellBlockBuilder =
> CellBlockMeta.newBuilder();
>            // Presumes the cellBlock bytebuffer has been flipped so limit
> has total size in it.
> @@ -1051,7 +1070,7 @@ public class RpcServer implements RpcServerInterface
> {
>        }
>
>        if (!call.response.hasRemaining()) {
> -        call.connection.decRpcCount();  // Say that we're done with this
> call.
> +        call.done();
>          return true;
>        } else {
>          return false; // Socket can't take more, we will have to come
> back.
> @@ -1885,7 +1904,13 @@ public class RpcServer implements
> RpcServerInterface {
>        final InetSocketAddress bindAddress, Configuration conf,
>        RpcScheduler scheduler)
>        throws IOException {
> -
> +    this.reservoir = new BoundedByteBufferPool(
> +      conf.getInt("hbase.ipc.server.reservoir.max.buffer.size",  1024 *
> 1024),
> +      conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 *
> 1024),
> +      // Make the max twice the number of handlers to be safe.
> +      conf.getInt("hbase.ipc.server.reservoir.initial.max",
> +        conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
> +          HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
>      this.server = server;
>      this.services = services;
>      this.bindAddress = bindAddress;
>
>
> http://git-wip-us.apache.org/repos/asf/hbase/blob/55f8f56a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
> ----------------------------------------------------------------------
> diff --git
> a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
> b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
> new file mode 100644
> index 0000000..e39b725
> --- /dev/null
> +++
> b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
> @@ -0,0 +1,46 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hadoop.hbase.io;
> +
> +import static org.junit.Assert.*;
> +
> +import java.io.IOException;
> +import java.nio.ByteBuffer;
> +
> +import org.apache.hadoop.hbase.util.Bytes;
> +import org.junit.Test;
> +
> +public class TestByteBufferOutputStream {
> +  @Test
> +  public void testByteBufferReuse() throws IOException {
> +    Bytes.toBytes("some bytes");
> +    ByteBuffer bb = ByteBuffer.allocate(16);
> +    ByteBuffer bbToReuse = write(bb, Bytes.toBytes("some bytes"));
> +    bbToReuse = write(bbToReuse, Bytes.toBytes("less"));
> +    assertTrue(bb == bbToReuse);
> +  }
> +
> +  private ByteBuffer write(final ByteBuffer bb, final byte [] bytes)
> throws IOException {
> +    try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) {
> +      bbos.write(bytes);
> +      assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0,
> bytes.length)) == 0);
> +      bbos.flush();
> +      return bbos.getByteBuffer();
> +    }
> +  }
> +}
> \ No newline at end of file
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message