zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [1/2] zookeeper git commit: ZOOKEEPER-3152: Port ZK netty stack to netty4
Date Thu, 22 Nov 2018 16:56:09 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/master 1507f67a0 -> caca06276


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 15f993c..7c51a12 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.TestByteBufAllocator;
 import org.apache.zookeeper.server.quorum.BufferStats;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
@@ -31,9 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -48,9 +51,17 @@ public class NettyServerCnxnTest extends ClientBase {
     public void setUp() throws Exception {
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
                 "org.apache.zookeeper.server.NettyServerCnxnFactory");
+        NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
         super.setUp();
     }
 
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        NettyServerCnxnFactory.clearTestAllocator();
+        TestByteBufAllocator.checkForLeaks();
+    }
+
     /**
      * Test verifies the channel closure - while closing the channel
      * servercnxnfactory should remove all channel references to avoid
@@ -110,6 +121,66 @@ public class NettyServerCnxnTest extends ClientBase {
 
             assertThat("Last client response size should be greater than 0 after client request
was performed",
                     clientResponseStats.getLastBufferSize(), greaterThan(0));
+
+            byte[] contents = zk.getData("/a", null, null);
+            assertArrayEquals("unexpected data", "test".getBytes(), contents);
+        }
+    }
+
+    @Test
+    public void testServerSideThrottling() throws IOException, InterruptedException, KeeperException
{
+        try (ZooKeeper zk = createClient()) {
+            BufferStats clientResponseStats = serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
+            assertThat("Last client response size should be initialized with INIT_VALUE",
+                    clientResponseStats.getLastBufferSize(), equalTo(BufferStats.INIT_VALUE));
+
+            zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+            assertThat("Last client response size should be greater than 0 after client request
was performed",
+                    clientResponseStats.getLastBufferSize(), greaterThan(0));
+
+            for (final ServerCnxn cnxn : serverFactory.cnxns) {
+                final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn);
+                // Disable receiving data for all open connections ...
+                nettyCnxn.disableRecv();
+                // ... then force a throttled read after 1 second (this puts the read into
queuedBuffer) ...
+                nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        nettyCnxn.getChannel().read();
+                    }
+                }, 1, TimeUnit.SECONDS);
+
+                // ... and finally disable throttling after 2 seconds.
+                nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        nettyCnxn.enableRecv();
+                    }
+                }, 2, TimeUnit.SECONDS);
+            }
+
+            byte[] contents = zk.getData("/a", null, null);
+            assertArrayEquals("unexpected data", "test".getBytes(), contents);
+
+            // As above, but don't do the throttled read. Make the request bytes wait in
the socket
+            // input buffer until after throttling is turned off. Need to make sure both
modes work.
+            for (final ServerCnxn cnxn : serverFactory.cnxns) {
+                final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn);
+                // Disable receiving data for all open connections ...
+                nettyCnxn.disableRecv();
+                // ... then disable throttling after 2 seconds.
+                nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        nettyCnxn.enableRecv();
+                    }
+                }, 2, TimeUnit.SECONDS);
+            }
+
+            contents = zk.getData("/a", null, null);
+            assertArrayEquals("unexpected data", "test".getBytes(), contents);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
index 6373bb3..c337e3c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
@@ -855,6 +855,7 @@ public class ClientTest extends ClientBase {
         // Sending a nonexisting opcode should cause the server to disconnect
         Assert.assertTrue("failed to disconnect",
                 clientDisconnected.await(5000, TimeUnit.MILLISECONDS));
+        zk.close();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
index 684d67a..bbcf869 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
@@ -22,7 +22,9 @@ import org.apache.zookeeper.ClientCnxnSocketNetty;
 import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.server.NettyServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -46,4 +48,15 @@ public class NettyNettySuiteBase {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
         System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
     }
+
+    @Before
+    public void setUpTest() throws Exception {
+        TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance());
+    }
+
+    @After
+    public void tearDownTest() throws Exception {
+        TestByteBufAllocatorTestHelper.clearTestAllocator();
+        TestByteBufAllocator.checkForLeaks();
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
index 5725c17..836eaa0 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
@@ -20,7 +20,9 @@ package org.apache.zookeeper.test;
 
 import org.apache.zookeeper.server.NettyServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -41,4 +43,15 @@ public class NioNettySuiteBase {
     public static void tearDown() {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
     }
+
+    @Before
+    public void setUpTest() throws Exception {
+        TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance());
+    }
+
+    @After
+    public void tearDownTest() throws Exception {
+        TestByteBufAllocatorTestHelper.clearTestAllocator();
+        TestByteBufAllocator.checkForLeaks();
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
index 8d10dc9..7b39ab1 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
@@ -60,6 +60,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
             .getLogger(ReconfigTest.class);
 
     private QuorumUtil qu;
+    private ZooKeeper[] zkArr;
+    private ZooKeeperAdmin[] zkAdminArr;
 
     @Before
     public void setup() {
@@ -70,6 +72,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
 
     @After
     public void tearDown() throws Exception {
+        closeAllHandles(zkArr, zkAdminArr);
         if (qu != null) {
             qu.tearDown();
         }
@@ -237,12 +240,16 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
     }
 
     public static void closeAllHandles(ZooKeeper[] zkArr, ZooKeeperAdmin[] zkAdminArr) throws
InterruptedException {
-        for (ZooKeeper zk : zkArr)
-            if (zk != null)
-                zk.close();
-        for (ZooKeeperAdmin zkAdmin : zkAdminArr)
-            if (zkAdmin != null)
-                zkAdmin.close();
+        if (zkArr != null) {
+            for (ZooKeeper zk : zkArr)
+                if (zk != null)
+                    zk.close();
+        }
+        if (zkAdminArr != null) {
+            for (ZooKeeperAdmin zkAdmin : zkAdminArr)
+                if (zkAdmin != null)
+                    zkAdmin.close();
+        }
     }
 
     @Test
@@ -250,8 +257,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> leavingServers = new ArrayList<String>();
         List<String> joiningServers = new ArrayList<String>();
@@ -317,8 +324,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
             leavingServers.clear();
             joiningServers.clear();
         }
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     /**
@@ -332,8 +337,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(2); // create 5 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> leavingServers = new ArrayList<String>();
         List<String> joiningServers = new ArrayList<String>();
@@ -423,8 +428,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
         testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
         testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @Test
@@ -432,8 +435,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(3); // create 7 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         // new config will have three of the servers as followers
         // two of the servers as observers, and all ports different
@@ -462,8 +465,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu.shutdown(4);
         
         testNormalOperation(zkArr[1], zkArr[2]);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @Test
@@ -471,8 +472,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(2); 
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> leavingServers = new ArrayList<String>();
        
@@ -493,8 +494,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[1], zkArr[2]);       
         for (int i=1; i<=5; i++)
             testServerHasConfig(zkArr[i], null, leavingServers);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @SuppressWarnings("unchecked")
@@ -512,8 +511,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         // changing a server's role / port is done by "adding" it with the same
         // id but different role / port
@@ -581,7 +580,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
                 changingIndex = leaderIndex;
             }
         }
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @Test
@@ -589,8 +587,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> joiningServers = new ArrayList<String>();
 
@@ -705,8 +703,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[follower2], zkArr[follower1]);
         testServerHasConfig(zkArr[follower1], joiningServers, null);
         testServerHasConfig(zkArr[follower2], joiningServers, null);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @Test
@@ -722,8 +718,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> joiningServers = new ArrayList<String>();
 
@@ -796,7 +792,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
             testServerHasConfig(zkArr[serverIndex], joiningServers, null);
             Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
         }
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     @Test
@@ -818,8 +813,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(3); // create 7 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         ArrayList<String> members = new ArrayList<String>();
         members.add("group.1=3:4:5");
@@ -886,8 +881,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
                         + i
                         + " doesn't think the quorum system is a majority quorum system!");
         }
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
     
     @Test
@@ -895,7 +888,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
+        zkArr = createHandles(qu);
         testNormalOperation(zkArr[1], zkArr[2]);
         for (int i=1; i<4; i++) {
             String configStr = testServerHasConfig(zkArr[i], null, null);
@@ -914,8 +907,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         List<String> leavingServers = new ArrayList<String>();
         List<String> joiningServers = new ArrayList<String>();
@@ -980,8 +973,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         // assert remotePeerBean.1 of ReplicatedServer_3
         leavingQS3 = peer3.getView().get(new Long(leavingIndex));
         assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     /**
@@ -993,8 +984,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         qu = new QuorumUtil(1); // create 3 servers
         qu.disableJMXTest = true;
         qu.startAll();
-        ZooKeeper[] zkArr = createHandles(qu);
-        ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+        zkArr = createHandles(qu);
+        zkAdminArr = createAdminHandles(qu);
 
         // changing a server's role / port is done by "adding" it with the same
         // id but different role / port
@@ -1055,8 +1046,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         // assert remotePeerBean.1 of ReplicatedServer_3
         changingQS3 = peer3.getView().get(new Long(changingIndex));
         assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
-
-        closeAllHandles(zkArr, zkAdminArr);
     }
 
     private void assertLocalPeerMXBeanAttributes(QuorumPeer qp,

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
new file mode 100644
index 0000000..dc13222
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer leaks.
+ *
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
+    private static AtomicReference<TestByteBufAllocator> INSTANCE =
+            new AtomicReference<>(null);
+
+    /**
+     * Get the singleton testing allocator.
+     * @return the singleton allocator, creating it if one does not exist.
+     */
+    public static TestByteBufAllocator getInstance() {
+        TestByteBufAllocator result = INSTANCE.get();
+        if (result == null) {
+            ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel();
+            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+            INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel));
+            result = INSTANCE.get();
+        }
+        return result;
+    }
+
+    /**
+     * Destroys the singleton testing allocator and throws an error if any of the
+     * buffers allocated by it have been leaked. Attempts to print leak details to
+     * standard error before throwing, by using netty's built-in leak tracking.
+     * Note that this might not always work, since it only triggers when a buffer
+     * is garbage-collected and calling System.gc() does not guarantee that a buffer
+     * will actually be GC'ed.
+     *
+     * This should be called at the end of a unit test's tearDown() method.
+     */
+    public static void checkForLeaks() {
+        TestByteBufAllocator result = INSTANCE.getAndSet(null);
+        if (result != null) {
+            result.checkInstanceForLeaks();
+        }
+    }
+
+    private final List<ByteBuf> trackedBuffers = new ArrayList<>();
+    private final ResourceLeakDetector.Level oldLevel;
+
+    private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
+    {
+        super(false);
+        this.oldLevel = oldLevel;
+    }
+
+    @Override
+    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
+    {
+        return track(super.newHeapBuffer(initialCapacity, maxCapacity));
+    }
+
+    @Override
+    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
+    {
+        return track(super.newDirectBuffer(initialCapacity, maxCapacity));
+    }
+
+    @Override
+    public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+    {
+        return track(super.compositeHeapBuffer(maxNumComponents));
+    }
+
+    @Override
+    public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
+    {
+        return track(super.compositeDirectBuffer(maxNumComponents));
+    }
+
+    private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
+    {
+        trackedBuffers.add(Objects.requireNonNull(byteBuf));
+        return byteBuf;
+    }
+
+    private synchronized ByteBuf track(ByteBuf byteBuf)
+    {
+        trackedBuffers.add(Objects.requireNonNull(byteBuf));
+        return byteBuf;
+    }
+
+    private void checkInstanceForLeaks()
+    {
+        try {
+            long referencedBuffersCount = 0;
+            synchronized (this) {
+                referencedBuffersCount = trackedBuffers.stream()
+                        .filter(byteBuf -> byteBuf.refCnt() > 0)
+                        .count();
+                // Make tracked buffers eligible for GC
+                trackedBuffers.clear();
+            }
+            // Throw an error if there were any leaked buffers
+            if (referencedBuffersCount > 0) {
+                // Trigger a GC. This will hopefully (but not necessarily) print
+                // details about detected leaks to standard error before the error
+                // is thrown.
+                System.gc();
+                throw new AssertionError("Found a netty ByteBuf leak!");
+            }
+        } finally {
+            ResourceLeakDetector.setLevel(oldLevel);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
new file mode 100644
index 0000000..de5e751
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+
+/**
+ * Uses reflection to call package-private methods in Netty connection classes
+ * to set/clear the test ByteBufAllocator.
+ */
+public class TestByteBufAllocatorTestHelper {
+    public static void setTestAllocator(ByteBufAllocator allocator)
+            throws NoSuchMethodException, IllegalAccessException, InvocationTargetException
{
+        Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class);
+        m1.setAccessible(true);
+        m1.invoke(null, allocator);
+        Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class);
+        m2.setAccessible(true);
+        m2.invoke(null, allocator);
+    }
+
+    public static void clearTestAllocator()
+            throws NoSuchMethodException, IllegalAccessException, InvocationTargetException
{
+        Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("clearTestAllocator");
+        m1.setAccessible(true);
+        m1.invoke(null);
+        Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("clearTestAllocator");
+        m2.setAccessible(true);
+        m2.invoke(null);
+    }
+}
\ No newline at end of file


Mime
View raw message