hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-13207 Backport to 0.98 '[PERF] Reuse the IPC buffers...
Date Mon, 16 Mar 2015 20:05:23 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 48f69550e -> 7fe3f3377


 HBASE-13207 Backport to 0.98 '[PERF] Reuse the IPC buffers...


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fe3f337
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fe3f337
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fe3f337

Branch: refs/heads/0.98
Commit: 7fe3f33779b1ce7ffb9dad28f2c1a0244d97a2e8
Parents: 48f6955
Author: stack <stack@apache.org>
Authored: Mon Mar 16 13:05:53 2015 -0700
Committer: stack <stack@apache.org>
Committed: Mon Mar 16 13:05:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |  53 ++++++---
 .../hadoop/hbase/io/BoundedByteBufferPool.java  | 113 +++++++++++++++++++
 .../hadoop/hbase/io/ByteBufferOutputStream.java |  32 ++++--
 .../hbase/io/TestBoundedByteBufferPool.java     |  87 ++++++++++++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  42 +++++--
 .../hbase/io/TestByteBufferOutputStream.java    |  52 +++++++++
 6 files changed, 348 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 6d00adc..365119a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -63,6 +64,7 @@ class IPCUtil {
     this.conf = conf;
     this.cellBlockDecompressionMultiplier =
         conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
     // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below
in
     // #buildCellBlock.
     this.cellBlockBuildingInitialBufferSize =
@@ -89,23 +91,48 @@ class IPCUtil {
   ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
     final CellScanner cellScanner)
   throws IOException {
+    return buildCellBlock(codec, compressor, cellScanner, null);
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code>
and/or
+   * <code>compressor</code>.
+   * @param codec
+   * @param compressor
+   * @param cellScanner
+   * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
+   * our own ByteBuffer.
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded
using
+   * passed in <code>codec</code> and/or <code>compressor</code>;
the returned buffer has been
+   * flipped and is ready for reading.  Use limit to find total size. If <code>pool</code>
was not
+   * null, then this returned ByteBuffer came from there and should be returned to the pool
when
+   * done.
+   * @throws IOException
+   */
+  @SuppressWarnings("resource")
+  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+    final CellScanner cellScanner, final BoundedByteBufferPool pool)
+  throws IOException {
     if (cellScanner == null) return null;
     if (codec == null) throw new CellScannerButNoCodecException();
     int bufferSize = this.cellBlockBuildingInitialBufferSize;
-    if (cellScanner instanceof HeapSize) {
-      long longSize = ((HeapSize)cellScanner).heapSize();
-      // Just make sure we don't have a size bigger than an int.
-      if (longSize > Integer.MAX_VALUE) {
-        throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+    ByteBufferOutputStream baos = null;
+    if (pool != null) {
+      ByteBuffer bb = pool.getBuffer();
+      bufferSize = bb.capacity();
+      baos = new ByteBufferOutputStream(bb);
+    } else {
+      // Then we need to make our own to return.
+      if (cellScanner instanceof HeapSize) {
+        long longSize = ((HeapSize)cellScanner).heapSize();
+        // Just make sure we don't have a size bigger than an int.
+        if (longSize > Integer.MAX_VALUE) {
+          throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+        }
+        bufferSize = ClassSize.align((int)longSize);
       }
-      bufferSize = ClassSize.align((int)longSize);
-    } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
-    // See TestIPCUtil main for experiment where we spin through the Cells getting estimate
of
-    // total size before creating the buffer.  It costs somw small percentage.  If we are
usually
-    // within the estimated buffer size, then the cost is not worth it.  If we are often
well
-    // outside the guesstimated buffer size, the processing can be done in half the time
if we
-    // go w/ the estimated size rather than let the buffer resize.
-    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+      baos = new ByteBufferOutputStream(bufferSize);
+    }
     OutputStream os = baos;
     Compressor poolCompressor = null;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
new file mode 100644
index 0000000..7eaa6cf
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
+ * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum
size
+ * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
+ * Hadoop's ElasticByteBuffferPool).
+ * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer
go
+ * rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
+ * we will not add the passed ByteBuffer to the pool; we will just drop it
+ * (we will log a WARN in this case that we are at capacity).
+ *
+ * <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers
tend to
+ * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level
on this
+ * class for a couple of seconds to get reporting on how it is running when deployed.
+ *
+ * <p>This class is thread safe.
+ */
+@InterfaceAudience.Private
+public class BoundedByteBufferPool {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  @VisibleForTesting
+  final Queue<ByteBuffer> buffers;
+
+  // Maximum size of a ByteBuffer to retain in pool
+  private final int maxByteBufferSizeToCache;
+
+  // A running average only it only rises, it never recedes
+  private volatile int runningAverage;
+
+  // Scratch that keeps rough total size of pooled bytebuffers
+  private volatile int totalReservoirCapacity;
+
+  // For reporting
+  private AtomicLong allocations = new AtomicLong(0);
+
+  /**
+   * @param maxByteBufferSizeToCache
+   * @param initialByteBufferSize
+   * @param maxToCache
+   */
+  public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
+      final int maxToCache) {
+    this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
+    this.runningAverage = initialByteBufferSize;
+    this.buffers = new ArrayBlockingQueue<ByteBuffer>(maxToCache, true);
+  }
+
+  public ByteBuffer getBuffer() {
+    ByteBuffer bb = this.buffers.poll();
+    if (bb != null) {
+      // Clear sets limit == capacity.  Postion == 0.
+      bb.clear();
+      this.totalReservoirCapacity -= bb.capacity();
+    } else {
+      bb = ByteBuffer.allocate(this.runningAverage);
+      this.allocations.incrementAndGet();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("runningAverage=" + this.runningAverage +
+        ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size()
+
+        ", allocations=" + this.allocations.get());
+    }
+    return bb;
+  }
+
+  public void putBuffer(ByteBuffer bb) {
+    // If buffer is larger than we want to keep around, just let it go.
+    if (bb.capacity() > this.maxByteBufferSizeToCache) return;
+    if (!this.buffers.offer(bb)) {
+      LOG.warn("At capacity: " + this.buffers.size());
+    } else {
+      int size = this.buffers.size(); // This size may be inexact.
+      this.totalReservoirCapacity += bb.capacity();
+      int average = 0;
+      if (size != 0) {
+        average = this.totalReservoirCapacity / size;
+      }
+      if (average > this.runningAverage && average < this.maxByteBufferSizeToCache)
{
+        this.runningAverage = average;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index 257b850..af12113 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends OutputStream {
   }
 
   public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
-    if (useDirectByteBuffer) {
-      buf = ByteBuffer.allocateDirect(capacity);
-    } else {
-      buf = ByteBuffer.allocate(capacity);
-    }
+    this(allocate(capacity, useDirectByteBuffer));
+  }
+
+  /**
+   * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated
in its
+   * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE
EFFECT!!
+   * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
+   * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation
violation
+   * is so we can recycle buffers rather than allocate each time; it can get expensive especially
+   * if the buffers are big doing allocations each time or having them undergo resizing because
+   * initial allocation was small.
+   * @see #getByteBuffer()
+   */
+  public ByteBufferOutputStream(final ByteBuffer bb) {
+    this.buf = bb;
+    this.buf.clear();
   }
 
   public int size() {
     return buf.position();
   }
 
+  private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer)
{
+    return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
+  }
+
   /**
    * This flips the underlying BB so be sure to use it _last_!
    * @return ByteBuffer
@@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends OutputStream {
       int newSize = (int)Math.min((((long)buf.capacity()) * 2),
           (long)(Integer.MAX_VALUE));
       newSize = Math.max(newSize, buf.position() + extra);
-      ByteBuffer newBuf = null;
-      if (buf.isDirect()) {
-        newBuf = ByteBuffer.allocateDirect(newSize);
-      } else {
-        newBuf = ByteBuffer.allocate(newSize);
-      }
+      ByteBuffer newBuf = allocate(newSize, buf.isDirect());
       buf.flip();
       newBuf.put(buf);
       buf = newBuf;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
new file mode 100644
index 0000000..79b9e68
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class })
+public class TestBoundedByteBufferPool {
+  final int maxByteBufferSizeToCache = 10;
+  final int initialByteBufferSize = 1;
+  final int maxToCache = 10;
+  BoundedByteBufferPool reservoir;
+
+  @Before
+  public void before() {
+    this.reservoir =
+      new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
+  }
+
+  @After
+  public void after() {
+    this.reservoir = null;
+  }
+
+  @Test
+  public void testEquivalence() {
+    ByteBuffer bb = ByteBuffer.allocate(1);
+    this.reservoir.putBuffer(bb);
+    this.reservoir.putBuffer(bb);
+    this.reservoir.putBuffer(bb);
+    assertEquals(3, this.reservoir.buffers.size());
+  }
+
+  @Test
+  public void testGetPut() {
+    ByteBuffer bb = this.reservoir.getBuffer();
+    assertEquals(initialByteBufferSize, bb.capacity());
+    assertEquals(0, this.reservoir.buffers.size());
+    this.reservoir.putBuffer(bb);
+    assertEquals(1, this.reservoir.buffers.size());
+    // Now remove a buffer and don't put it back so reservoir is empty.
+    this.reservoir.getBuffer();
+    assertEquals(0, this.reservoir.buffers.size());
+    // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage
works.
+    // Need to add then remove, then get a new bytebuffer so reservoir internally is doing
+    // allocation
+    final int newCapacity = 2;
+    this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
+    assertEquals(1, reservoir.buffers.size());
+    this.reservoir.getBuffer();
+    assertEquals(0, this.reservoir.buffers.size());
+    bb = this.reservoir.getBuffer();
+    assertEquals(newCapacity, bb.capacity());
+    // Assert that adding a too-big buffer won't happen
+    assertEquals(0, this.reservoir.buffers.size());
+    this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
+    assertEquals(0, this.reservoir.buffers.size());
+    // Assert we can't add more than max allowed instances.
+    for (int i = 0; i < maxToCache; i++) {
+      this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
+    }
+    assertEquals(maxToCache, this.reservoir.buffers.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f031b0b..aafdc02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -264,6 +265,9 @@ public class RpcServer implements RpcServerInterface {
 
   private UserProvider userProvider;
 
+  private final BoundedByteBufferPool reservoir;
+
+
   /**
    * Datastructure that holds all necessary to a method invocation and then afterward, carries
    * the result.
@@ -290,6 +294,7 @@ public class RpcServer implements RpcServerInterface {
     protected long size;                          // size of current call
     protected boolean isError;
     protected TraceInfo tinfo;
+    private ByteBuffer cellBlock = null;
 
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader
header,
          Message param, CellScanner cellScanner, Connection connection, Responder responder,
@@ -310,6 +315,18 @@ public class RpcServer implements RpcServerInterface {
       this.tinfo = tinfo;
     }
 
+    /**
+     * Call is done. Execution happened and we returned results to client. It is now safe
to
+     * cleanup.
+     */
+    void done() {
+      if (this.cellBlock != null) {
+        // Return buffer to reservoir now we are done with it.
+        reservoir.putBuffer(this.cellBlock);
+        this.cellBlock = null;
+      }
+    }
+
     @Override
     public String toString() {
       return toShortString() + " param: " +
@@ -381,12 +398,15 @@ public class RpcServer implements RpcServerInterface {
           // Set the exception as the result of the method invocation.
           headerBuilder.setException(exceptionBuilder.build());
         }
-        ByteBuffer cellBlock =
-          ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec,
cells);
-        if (cellBlock != null) {
+        // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back
to the
+        // reservoir when finished. This is hacky and the hack is not contained but benefits
are
+        // high when we can avoid a big buffer allocation on each rpc.
+        this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
+          this.connection.compressionCodec, cells, reservoir);
+        if (this.cellBlock != null) {
           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in
it.
-          cellBlockBuilder.setLength(cellBlock.limit());
+          cellBlockBuilder.setLength(this.cellBlock.limit());
           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
         }
         Message header = headerBuilder.build();
@@ -396,9 +416,9 @@ public class RpcServer implements RpcServerInterface {
         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
-          (cellBlock == null? 0: cellBlock.limit());
+          (this.cellBlock == null? 0: this.cellBlock.limit());
         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
-        bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
+        bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
         if (connection.useWrap) {
           bc = wrapWithSasl(bc);
         }
@@ -1051,6 +1071,7 @@ public class RpcServer implements RpcServerInterface {
           closeConnection(call.connection);
         }
       }
+      if (done) call.done();
       return done;
     }
 
@@ -1852,8 +1873,15 @@ public class RpcServer implements RpcServerInterface {
       final List<BlockingServiceAndInterface> services,
       final InetSocketAddress isa, Configuration conf,
       RpcScheduler scheduler)
-  throws IOException {
+      throws IOException {
     this.serverInstance = serverInstance;
+    this.reservoir = new BoundedByteBufferPool(
+      conf.getInt("hbase.ipc.server.reservoir.max.buffer.size",  1024 * 1024),
+      conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
+      // Make the max twice the number of handlers to be safe.
+      conf.getInt("hbase.ipc.server.reservoir.initial.max",
+        conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
     this.services = services;
     this.isa = isa;
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe3f337/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
new file mode 100644
index 0000000..556c363
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestByteBufferOutputStream {
+  @Test
+  public void testByteBufferReuse() throws IOException {
+    byte [] someBytes = Bytes.toBytes("some bytes");
+    ByteBuffer bb = ByteBuffer.allocate(someBytes.length);
+    ByteBuffer bbToReuse = write(bb, someBytes);
+    bbToReuse = write(bbToReuse, Bytes.toBytes("less"));
+    assertTrue(bb == bbToReuse);
+  }
+
+  private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException {
+    ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb);
+    try {
+      bbos.write(bytes);
+      assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0);
+      bbos.flush();
+      return bbos.getByteBuffer();
+    } finally {
+      bbos.close();
+    }
+  }
+}


Mime
View raw message