bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [bookkeeper] branch master updated: ISSUE #271: LedgerHandle#readEntries leaks ByteBufs
Date Thu, 27 Jul 2017 03:04:09 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c00cb9f  ISSUE #271: LedgerHandle#readEntries leaks ByteBufs
c00cb9f is described below

commit c00cb9f69104c99d7ad10242c04175a5d9ef3801
Author: Enrico Olivelli <eolivelli@apache.org>
AuthorDate: Thu Jul 27 05:04:03 2017 +0200

    ISSUE #271: LedgerHandle#readEntries leaks ByteBufs
    
    Add a setNettyUsePooledBuffers() client configuration option to let the user decide to
use Pooled vs Unpooled ByteBufs.
    Using v2 wire protocol Application receives the original ByteBuf return from Channel and
it is responsible for 'releasing' it.
    
    Add assertions on LedgerEntry to prevent the user from accessing data more then once
    
    Author: Enrico Olivelli <eolivelli@apache.org>
    
    Reviewers: Matteo Merli, Sijie Guo
    
    This closes #276 from eolivelli/issue-271-leaks, closes #271
---
 .../org/apache/bookkeeper/client/LedgerEntry.java  |  31 ++++-
 .../bookkeeper/conf/ClientConfiguration.java       |  30 +++++
 .../bookkeeper/proto/PerChannelBookieClient.java   |  11 +-
 .../apache/bookkeeper/client/BookKeeperTest.java   | 137 +++++++++++++++++++++
 4 files changed, 205 insertions(+), 4 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index ed9fb29..3187127 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -20,10 +20,12 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 
 import java.io.InputStream;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 
 /**
  * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
@@ -53,22 +55,45 @@ public class LedgerEntry {
         return length;
     }
 
+    /**
+     * Returns the content of the entry.
+     * This method can be called only once. While using v2 wire protocol this method will
automatically release
+     * the internal ByteBuf
+     * 
+     * @return the content of the entry
+     */
     public byte[] getEntry() {
+        Preconditions.checkNotNull(data, "entry content can be accessed only once");
         byte[] entry = new byte[data.readableBytes()];
         data.readBytes(entry);
         data.release();
+        data = null;
         return entry;
     }
 
+    /**
+     * Returns the content of the entry.
+     * This method can be called only once. While using v2 wire protocol this method will
automatically release
+     * the internal ByteBuf when calling the close
+     * method of the returned InputStream
+     *
+     * @return an InputStream which gives access to the content of the entry
+     */
     public InputStream getEntryInputStream() {
-        return new ByteBufInputStream(data);
+        Preconditions.checkNotNull(data, "entry content can be accessed only once");
+        ByteBufInputStream res = new ByteBufInputStream(data);
+        data = null;
+        return res;
     }
 
     /**
      * Return the internal buffer that contains the entry payload.
-     * <p>
      *
-     * Note: It is responsibility of the caller to ensure to release the buffer after usage.
+     * Note: Using v2 wire protocol it is responsibility of the caller to ensure to release
the buffer after usage.
+     *
+     * @return a ByteBuf which contains the data
+     *
+     * @see ClientConfiguration#setNettyUsePooledBuffers(boolean)
      */
     public ByteBuf getEntryBuffer() {
         return data;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 157c6b5..6252786 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.conf;
 
 import static com.google.common.base.Charsets.UTF_8;
+import io.netty.buffer.ByteBuf;
 import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
 
 import java.util.ArrayList;
@@ -27,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -62,6 +64,8 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String CLIENT_CONNECT_TIMEOUT_MILLIS = "clientConnectTimeoutMillis";
     protected final static String NUM_CHANNELS_PER_BOOKIE = "numChannelsPerBookie";
     protected final static String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
+    protected final static String NETTY_USE_POOLED_BUFFERS = "nettyUsePooledBuffers";
+
     // Read Parameters
     protected final static String READ_TIMEOUT = "readTimeout";
     protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
@@ -1420,4 +1424,30 @@ public class ClientConfiguration extends AbstractConfiguration {
         setProperty(DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME, disableEnsembleChangeFeatureName);
         return this;
     }
+
+
+    /**
+     * Option to use Netty Pooled ByteBufs
+     *
+     * @return the value of the option
+     */
+    public boolean isNettyUsePooledBuffers() {
+        return getBoolean(NETTY_USE_POOLED_BUFFERS, true);
+    }
+
+    /**
+     * Enable/Disable the usage of Pooled Netty buffers. While using v2 wire protocol the
application will be
+     * responsible for releasing ByteBufs returned by BookKeeper
+     *
+     * @param enabled
+     *          if enabled BookKeeper will use default Pooled Netty Buffer allocator
+     *
+     * @see #setUseV2WireProtocol(boolean)
+     * @see ByteBuf#release()
+     * @see LedgerHandle#readEntries(long, long)
+     */
+    public ClientConfiguration setNettyUsePooledBuffers(boolean enabled) {
+        setProperty(NETTY_USE_POOLED_BUFFERS, enabled);
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 7ecb0b4..9ba75d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -103,6 +103,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.protobuf.ExtensionRegistry;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import java.net.SocketAddress;
 
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
@@ -306,7 +308,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter
{
             bootstrap.channel(NioSocketChannel.class);
         }
 
-        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        ByteBufAllocator allocator;
+        if (this.conf.isNettyUsePooledBuffers()) {
+            allocator = PooledByteBufAllocator.DEFAULT;
+        } else {
+            allocator = UnpooledByteBufAllocator.DEFAULT;
+        }
+
+        bootstrap.option(ChannelOption.ALLOCATOR, allocator);
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis());
         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                 conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark()));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index a0d64a4..92ba078 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -1,5 +1,6 @@
 package org.apache.bookkeeper.client;
 
+import io.netty.util.IllegalReferenceCountException;
 import java.util.Collections;
 import java.util.Enumeration;
 
@@ -697,4 +698,140 @@ public class BookKeeperTest extends BaseTestCase {
             }
         }
     }
+
+    @Test(timeout = 60000)
+    public void testReadEntryReleaseByteBufs() throws Exception {
+        ClientConfiguration confWriter = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString());
+        int numEntries = 10;
+        byte[] data = "foobar".getBytes();
+        long ledgerId;
+        try (BookKeeper bkc = new BookKeeper(confWriter)) {
+            try (LedgerHandle lh = bkc.createLedger(digestType, "testPasswd".getBytes()))
{
+                ledgerId = lh.getId();
+                for (int i = 0; i < numEntries; i++) {
+                    lh.addEntry(data);
+                }
+            }
+        }
+
+        // v2 protocol, using pooled buffers
+        ClientConfiguration confReader1 = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseV2WireProtocol(true)
+            .setNettyUsePooledBuffers(true);
+        try (BookKeeper bkc = new BookKeeper(confReader1)) {
+            try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes()))
{
+                assertEquals(numEntries - 1, lh.readLastConfirmed());
+                for (Enumeration<LedgerEntry> readEntries = lh.readEntries(0, numEntries
- 1);
+                    readEntries.hasMoreElements();) {
+                    LedgerEntry entry = readEntries.nextElement();
+                    assertTrue(entry.data.getClass().getName(),
+                        entry.data.getClass().getName().contains("PooledNonRetainedSlicedByteBuf"));
+                    assertTrue(entry.data.release());
+                    try {
+                        entry.data.release();
+                        fail("ByteBuf already released");
+                    } catch (IllegalReferenceCountException ok) {
+                    }
+                }
+            }
+        }
+
+        // v2 protocol, not using pooled buffers
+        ClientConfiguration confReader2 = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseV2WireProtocol(true)
+            .setNettyUsePooledBuffers(false);
+        try (BookKeeper bkc = new BookKeeper(confReader2)) {
+            try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes()))
{
+                assertEquals(numEntries - 1, lh.readLastConfirmed());
+                for (Enumeration<LedgerEntry> readEntries = lh.readEntries(0, numEntries
- 1);
+                    readEntries.hasMoreElements();) {
+                    LedgerEntry entry = readEntries.nextElement();
+                    // ButeBufs no reference counter
+                    assertTrue(entry.data.release());
+                    assertTrue(entry.data.getClass().getName(),
+                        entry.data.getClass().getName().contains("UnpooledSlicedByteBuf"));
+                    try {
+                        entry.data.release();
+                        fail("ByteBuf already released");
+                    } catch (IllegalReferenceCountException ok) {
+                    }
+                }
+            }
+        }
+
+        // v3 protocol, not using pooled buffers
+        ClientConfiguration confReader3 = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseV2WireProtocol(false)
+            .setNettyUsePooledBuffers(false);
+        try (BookKeeper bkc = new BookKeeper(confReader3)) {
+            try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes()))
{
+                assertEquals(numEntries - 1, lh.readLastConfirmed());
+                for (Enumeration<LedgerEntry> readEntries = lh.readEntries(0, numEntries
- 1);
+                    readEntries.hasMoreElements();) {
+                    LedgerEntry entry = readEntries.nextElement();
+                    // ButeBufs not reference counter
+                    assertTrue(entry.data.getClass().getName(),
+                        entry.data.getClass().getName().contains("UnpooledSlicedByteBuf"));
+                    assertTrue(entry.data.release());
+                    try {
+                        entry.data.release();
+                        fail("ByteBuf already released");
+                    } catch (IllegalReferenceCountException ok) {
+                    }
+                }
+            }
+        }
+
+        // v3 protocol, using pooled buffers
+        // v3 protocol from 4.5 always "wraps" buffers returned by protobuf
+        ClientConfiguration confReader4 = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseV2WireProtocol(false)
+            .setNettyUsePooledBuffers(true);
+        try (BookKeeper bkc = new BookKeeper(confReader4)) {
+            try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes()))
{
+                assertEquals(numEntries - 1, lh.readLastConfirmed());
+                for (Enumeration<LedgerEntry> readEntries = lh.readEntries(0, numEntries
- 1);
+                    readEntries.hasMoreElements();) {
+                    LedgerEntry entry = readEntries.nextElement();
+                    // ButeBufs not reference counter
+                    assertTrue(entry.data.getClass().getName(),
+                        entry.data.getClass().getName().contains("UnpooledSlicedByteBuf"));
+                    assertTrue(entry.data.release());
+                    try {
+                        entry.data.release();
+                        fail("ByteBuf already released");
+                    } catch (IllegalReferenceCountException ok) {
+                    }                }
+            }
+        }
+
+        // cannot read twice an entry
+        ClientConfiguration confReader5 = new ClientConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString());
+        try (BookKeeper bkc = new BookKeeper(confReader5)) {
+            try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, "testPasswd".getBytes()))
{
+                assertEquals(numEntries - 1, lh.readLastConfirmed());
+                for (Enumeration<LedgerEntry> readEntries = lh.readEntries(0, numEntries
- 1);
+                    readEntries.hasMoreElements();) {
+                    LedgerEntry entry = readEntries.nextElement();
+                    entry.getEntry();
+                    try {
+                        entry.getEntry();
+                        fail("entry data accessed twice");
+                    } catch (IllegalStateException ok){
+                    }
+                    try {
+                        entry.getEntryInputStream();
+                        fail("entry data accessed twice");
+                    } catch (IllegalStateException ok){
+                    }
+                }
+            }
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message