bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [1/4] bookkeeper git commit: BOOKKEEPER-1008: Netty 4.1
Date Mon, 15 May 2017 17:52:59 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 811ece53a -> 74f795136


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index dc43b2c..25b0d6b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -17,15 +17,15 @@
  */
 package org.apache.bookkeeper.client;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 public class ClientUtil {
-    public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
+    public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
                                                long length, byte[] data) {
         return generatePacket(ledgerId, entryId, lastAddConfirmed, length, data, 0, data.length);
     }
 
-    public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
+    public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
                                                long length, byte[] data, int offset, int
len) {
         CRC32DigestManager dm = new CRC32DigestManager(ledgerId);
         return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 521d1e3..d77d184 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -165,6 +165,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
                     try {
                         while (!finished.get()) {
                             lh.addEntry(entry);
+                            Thread.sleep(1);
                         }
                     } catch (Exception e) {
                         LOG.error("Exception in add entry thread", e);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index ed41cb2..6ff1535 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -24,7 +24,6 @@ package org.apache.bookkeeper.client;
 import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -32,18 +31,19 @@ import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 /**
  * This unit test tests timeout of GetBookieInfo request;
  *
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
 public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
     private final static Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class);
     DigestType digestType;
-    public ClientSocketChannelFactory channelFactory;
+    public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
 
     public TestGetBookieInfoTimeout() {
@@ -62,8 +62,8 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase
{
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors
-                .newCachedThreadPool());
+        eventLoopGroup = new NioEventLoopGroup();
+
         executor = OrderedSafeExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
@@ -72,7 +72,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase
{
 
     @After
     public void tearDown() throws Exception {
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 
@@ -99,7 +99,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase
{
         // try to get bookie info from the sleeping bookie. It should fail with timeout error
         BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClient(cConf, channelFactory, executor);
+        BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
                 BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 6739ea4..99f8f5b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import io.netty.util.HashedWheelTimer;
 import junit.framework.TestCase;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
@@ -40,7 +41,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +54,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
     final List<Integer> writeSet = new ArrayList<Integer>();
     ClientConfiguration conf = new ClientConfiguration();
     BookieSocketAddress addr1, addr2, addr3, addr4;
-    HashedWheelTimer timer;
+    io.netty.util.HashedWheelTimer timer;
 
     @Override
     protected void setUp() throws Exception {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index af6a44e..ef90401 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.net.CommonConfigurationKeys;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.util.Shell;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -50,6 +49,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import io.netty.util.HashedWheelTimer;
+
 /**
  * In this testsuite, ScriptBasedMapping is used as DNS_RESOLVER_CLASS for
  * mapping nodes to racks. Shell Script -

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 5c61ae3..09c6262 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.HashedWheelTimer;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
@@ -38,7 +41,6 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StaticDNSResolver;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
index 5a1f7fc..41f35a7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
@@ -32,6 +32,9 @@ import org.apache.bookkeeper.test.BaseTestCase;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.channel.Channel;
+import io.netty.channel.local.LocalChannel;
+
 /**
  * Tests of the main BookKeeper client using networkless comunication
  */
@@ -68,12 +71,11 @@ public class NetworkLessBookieTest extends BaseTestCase {
         }
 
         for (BookieServer bk : bs) {
-            for (ChannelManager channel : bk.nettyServer.channels) {
-                if (! (channel instanceof VMLocalChannelManager)) {
+            for (Channel channel : bk.nettyServer.allChannels) {
+                if (!(channel instanceof LocalChannel)) {
                     Assert.fail();
                 }
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 0a9bdc4..ed2a4b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -29,27 +29,22 @@ import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ExtensionRegistry;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.auth.TestAuth;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.proto.BookieProtocol.*;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ArrayBlockingQueue;
 
 import static org.junit.Assert.*;
@@ -63,14 +58,14 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase
{
 
     ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
     ClientAuthProvider.Factory authProvider;
-    ClientSocketChannelFactory channelFactory
-        = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                            Executors.newCachedThreadPool());
+    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
             .build();
 
     public TestBackwardCompatCMS42() throws Exception {
         super(0);
+
+        baseConf.setGcWaitTime(60000);
         authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(
                 new ClientConfiguration());
     }
@@ -179,44 +174,36 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase
{
     }
 
     CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
-        return new CompatClient42(executor, channelFactory, addr, authProvider, extRegistry);
+        return new CompatClient42(executor, eventLoopGroup, addr, authProvider, extRegistry);
     }
 
     // extending PerChannelBookieClient to get the pipeline factory
     class CompatClient42 extends PerChannelBookieClient {
         final ArrayBlockingQueue<Response> responses = new ArrayBlockingQueue<Response>(10);
-        final Channel channel;
+        Channel channel;
         final CountDownLatch connected = new CountDownLatch(1);
 
-        CompatClient42(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+        CompatClient42(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
                        BookieSocketAddress addr,
                        ClientAuthProvider.Factory authProviderFactory,
                        ExtensionRegistry extRegistry) throws Exception {
-            super(executor, channelFactory, addr, authProviderFactory, extRegistry);
-
-            ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-            bootstrap.setPipelineFactory(this);
-            bootstrap.setOption("tcpNoDelay", false);
-            bootstrap.setOption("keepAlive", true);
-            ChannelFuture f = bootstrap.connect(addr.getSocketAddress()).await();
-            channel = f.getChannel();
+            super(executor, eventLoopGroup, addr, authProviderFactory, extRegistry);
+
+            state = ConnectionState.CONNECTING;
+            ChannelFuture future = connect();
+            future.await();
+            channel = future.channel();
+            connected.countDown();
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
-            if (!(e.getMessage() instanceof Response)) {
-                LOG.error("Unknown message {}, passing upstream", e.getMessage());
-                ctx.sendUpstream(e);
+        public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws
Exception {
+            if (!(msg instanceof Response)) {
+                LOG.error("Unknown message {}, passing upstream", msg);
+                ctx.fireChannelRead(msg);
                 return;
             }
-            responses.add((Response)e.getMessage());
-        }
-
-        @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                                     ChannelStateEvent e)
-                throws Exception {
-            connected.countDown();
+            responses.add((Response) msg);
         }
 
         Response takeResponse() throws Exception {
@@ -229,7 +216,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase
{
 
         void sendRequest(Request request) throws Exception {
             connected.await();
-            channel.write(request);
+            channel.writeAndFlush(request);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 2095b66..16bbfd0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -33,20 +33,20 @@ import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ExtensionRegistry;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -78,14 +78,12 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
      */
     @Test(timeout=60000)
     public void testConnectCloseRace() throws Exception {
-        ClientSocketChannelFactory channelFactory
-            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                Executors.newCachedThreadPool());
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         OrderedSafeExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 1000; i++) {
-            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
addr,
+            PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
addr,
                     authProvider, extRegistry);
             client.connectIfNeededAndDoOp(new GenericCallback<PerChannelBookieClient>()
{
                     @Override
@@ -96,7 +94,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
                 });
             client.close();
         }
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 
@@ -123,21 +121,19 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
                 // we just want to trigger it connecting.
             }
         };
-        ClientSocketChannelFactory channelFactory
-            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                Executors.newCachedThreadPool());
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         OrderedSafeExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 100; i++) {
-            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
addr,
+            PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
addr,
                                                                        authProvider, extRegistry);
             for (int j = i; j < 10; j++) {
                 client.connectIfNeededAndDoOp(nullop);
             }
             client.close();
         }
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 
@@ -157,16 +153,13 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
             }
         };
         final int ITERATIONS = 100000;
-        ClientSocketChannelFactory channelFactory
-            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                Executors.newCachedThreadPool());
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
-        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
                 addr, authProvider, extRegistry);
         final AtomicBoolean shouldFail = new AtomicBoolean(false);
-        final AtomicBoolean inconsistent = new AtomicBoolean(false);
         final AtomicBoolean running = new AtomicBoolean(true);
         final CountDownLatch disconnectRunning = new CountDownLatch(1);
         Thread connectThread = new Thread() {
@@ -206,12 +199,12 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
 
                             if ((state == ConnectionState.CONNECTED
                                  && (channel == null
-                                     || !channel.isConnected()))
+                                     || !channel.isActive()))
                                 || (state != ConnectionState.CONNECTED
                                     && channel != null
-                                    && channel.isConnected())) {
+                                    && channel.isActive())) {
                                 LOG.error("State({}) and channel({}) inconsistent " + channel,
-                                          state, channel == null ? null : channel.isConnected());
+                                          state, channel == null ? null : channel.isActive());
                                 shouldFail.set(true);
                                 running.set(false);
                             }
@@ -228,7 +221,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
         checkThread.join();
         assertFalse("Failure in threads, check logs", shouldFail.get());
         client.close();
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 
@@ -255,19 +248,17 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
         bsConfs.add(conf);
         bs.add(startBookie(conf, delayBookie));
 
-        ClientSocketChannelFactory channelFactory
-            = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                Executors.newCachedThreadPool());
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         final OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
-        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup,
                 addr, authProvider, extRegistry);
         final CountDownLatch completion = new CountDownLatch(1);
         final ReadEntryCallback cb = new ReadEntryCallback() {
                 @Override
                 public void readEntryComplete(int rc, long ledgerId, long entryId,
-                                              ChannelBuffer buffer, Object ctx) {
+                    ByteBuf buffer, Object ctx) {
                     completion.countDown();
                 }
             };
@@ -292,7 +283,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase
{
         Thread.sleep(1000);
         client.disconnect();
         client.close();
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
 
         assertTrue("Request should have completed", completion.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index efb8375..a71dcb5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -446,6 +446,7 @@ public abstract class BookKeeperClusterTestCase {
             throws Exception {
         ServerConfiguration conf = newServerConfiguration();
         bsConfs.add(conf);
+        LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
         bs.add(startBookie(conf));
 
         return conf.getBookiePort();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 0698780..3d9a540 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -21,12 +21,16 @@ package org.apache.bookkeeper.test;
  *
  */
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
@@ -44,24 +48,18 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.IOUtils;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
 public class BookieClientTest {
-    private final static Logger LOG = LoggerFactory.getLogger(BookieClientTest.class);
     BookieServer bs;
     File tmpDir;
     public int port = 13645;
-    public ClientSocketChannelFactory channelFactory;
+
+    public EventLoopGroup eventLoopGroup;
     public OrderedSafeExecutor executor;
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
 
@@ -77,8 +75,7 @@ public class BookieClientTest {
             .setLedgerDirNames(new String[] { tmpDir.getPath() });
         bs = new BookieServer(conf);
         bs.start();
-        channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors
-                .newCachedThreadPool());
+        eventLoopGroup = new NioEventLoopGroup();
         executor = OrderedSafeExecutor.newBuilder()
                 .name("BKClientOrderedSafeExecutor")
                 .numThreads(2)
@@ -89,7 +86,7 @@ public class BookieClientTest {
     public void tearDown() throws Exception {
         bs.shutdown();
         recursiveDelete(tmpDir);
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 
@@ -110,13 +107,13 @@ public class BookieClientTest {
 
     ReadEntryCallback recb = new ReadEntryCallback() {
 
-        public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer
bb, Object ctx) {
+        public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf bb, Object
ctx) {
             ResultStruct rs = (ResultStruct) ctx;
             synchronized (rs) {
                 rs.rc = rc;
                 if (BKException.Code.OK == rc && bb != null) {
                     bb.readerIndex(24);
-                    rs.entry = bb.toByteBuffer();
+                    rs.entry = bb.nioBuffer();
                 }
                 rs.notifyAll();
             }
@@ -146,9 +143,8 @@ public class BookieClientTest {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
-        ChannelBuffer bb;
-        bb = createByteBuffer(1, 1, 1);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor);
+        ByteBuf bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE);
         synchronized (arc) {
             arc.wait(1000);
@@ -234,22 +230,20 @@ public class BookieClientTest {
         }
     }
 
-    private ChannelBuffer createByteBuffer(int i, long lid, long eid) {
-        ByteBuffer bb;
-        bb = ByteBuffer.allocate(4 + 24);
-        bb.putLong(lid);
-        bb.putLong(eid);
-        bb.putLong(eid-1);
-        bb.putInt(i);
-        bb.flip();
-        return ChannelBuffers.wrappedBuffer(bb);
+    private ByteBuf createByteBuffer(int i, long lid, long eid) {
+        ByteBuf bb = Unpooled.buffer(4 + 16);
+        bb.writeLong(lid);
+        bb.writeLong(eid);
+        bb.writeLong(eid - 1);
+        bb.writeInt(i);
+        return bb;
     }
 
     @Test(timeout=60000)
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc);
             arc.wait(1000);
@@ -260,7 +254,7 @@ public class BookieClientTest {
     @Test(timeout=60000)
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port);
-        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(),
executor);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
|
                 BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
index 4d29ff4..5f1f839 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
@@ -260,7 +260,7 @@ public class BookieReadWriteTest extends MultiLedgerManagerMultiDigestTestCase
         }
     }
 
-    @Test
+    @Test(timeout=60000)
     public void testReadWriteAsyncSingleClient200() throws IOException {
         testReadWriteAsyncSingleClient(200);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
index 3a36129..771628b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
@@ -21,9 +21,12 @@ package org.apache.bookkeeper.test;
  *
  */
 
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -31,9 +34,6 @@ import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +67,8 @@ class LoopbackClient implements WriteCallback {
         }
     }
 
-    LoopbackClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
long begin, int limit) throws IOException {
-        this.client = new BookieClient(new ClientConfiguration(), channelFactory, executor);
+    LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor executor, long begin,
int limit) throws IOException {
+        this.client = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor);
         this.begin = begin;
     }
 
@@ -78,7 +78,7 @@ class LoopbackClient implements WriteCallback {
         byte[] passwd = new byte[20];
         Arrays.fill(passwd, (byte) 'a');
 
-        client.addEntry(addr, ledgerId, passwd, entry, ChannelBuffers.wrappedBuffer(data),
cb, ctx, BookieProtocol.FLAG_NONE);
+        client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb,
ctx, BookieProtocol.FLAG_NONE);
     }
 
     public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr,
Object ctx) {
@@ -94,15 +94,14 @@ class LoopbackClient implements WriteCallback {
         long begin = System.currentTimeMillis();
 
         LoopbackClient lb;
-        ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors
-                .newCachedThreadPool());
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
                 .name("BookieClientScheduler")
                 .numThreads(2)
                 .build();
         try {
             BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", Integer.valueOf(args[2]).intValue());
-            lb = new LoopbackClient(channelFactory, executor, begin, limit.intValue());
+            lb = new LoopbackClient(eventLoopGroup, executor, begin, limit.intValue());
 
             for (int i = 0; i < limit; i++) {
                 lb.write(ledgerId, i, data, addr, lb, c);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
new file mode 100644
index 0000000..b619d0e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class DoubleByteBufTest {
+
+    @Test(timeout = 30000)
+    public void testGetBytes() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 4, 5, 6 });
+        doTest(b1, b2);
+    }
+
+    @Test(timeout = 30000)
+    public void testGetBytesWithDoubleByteBufAssource() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
+        ByteBuf b3 = Unpooled.wrappedBuffer(new byte[] { 5, 6 });
+
+        ByteBuf b23 = DoubleByteBuf.get(b2, b3);
+        doTest(b1, b23);
+    }
+
+    @Test(timeout = 30000)
+    public void testGetBytesWithIndex() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 9, 9, 4, 5, 6 });
+
+        // Skip the two '9' from b2
+        b2.readByte();
+        b2.readByte();
+
+        doTest(b1, b2);
+    }
+
+    private void doTest(ByteBuf b1, ByteBuf b2) {
+        ByteBuf buf = DoubleByteBuf.get(b1, b2);
+
+        assertEquals(6, buf.readableBytes());
+        assertEquals(0, buf.writableBytes());
+
+        ByteBuf dst1 = Unpooled.buffer(6);
+        buf.getBytes(0, dst1);
+        assertEquals(6, dst1.readableBytes());
+        assertEquals(0, dst1.writableBytes());
+        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 }), dst1);
+
+        ByteBuf dst2 = Unpooled.buffer(6);
+        buf.getBytes(0, dst2, 4);
+        assertEquals(4, dst2.readableBytes());
+        assertEquals(2, dst2.writableBytes());
+        assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4 }), dst2);
+
+        ByteBuf dst3 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
+        buf.getBytes(0, dst3, 1, 4);
+        assertEquals(6, dst3.readableBytes());
+        assertEquals(0, dst3.writableBytes());
+        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 0 }), dst3);
+
+        ByteBuf dst4 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
+        buf.getBytes(2, dst4, 1, 3);
+        assertEquals(6, dst4.readableBytes());
+        assertEquals(0, dst4.writableBytes());
+        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 3, 4, 5, 0, 0 }), dst4);
+
+        ByteBuf dst5 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 });
+        buf.getBytes(3, dst5, 1, 3);
+        assertEquals(6, dst5.readableBytes());
+        assertEquals(0, dst5.writableBytes());
+        assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 4, 5, 6, 0, 0 }), dst5);
+    }
+
+    @Test(timeout = 30000)
+    public void testCopyToArray() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+
+        byte[] a1 = new byte[4];
+        b.getBytes(0, a1);
+        assertArrayEquals(new byte[] { 1, 2, 3, 4 }, a1);
+
+        byte[] a2 = new byte[3];
+        b.getBytes(1, a2);
+        assertArrayEquals(new byte[] { 2, 3, 4 }, a2);
+    }
+
+    @Test(timeout = 30000)
+    public void testToByteBuffer() {
+        ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 });
+        ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 });
+        ByteBuf b = DoubleByteBuf.get(b1, b2);
+
+        assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 73f62b4..a892cef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <protobuf.version>2.6.1</protobuf.version>
     <guava.version>13.0.1</guava.version>
-    <netty.version>3.9.4.Final</netty.version>
+    <netty.version>4.1.10.Final</netty.version>
     <zookeeper.version>3.5.1-alpha</zookeeper.version>
   </properties>
   <url>http://zookeeper.apache.org/bookkeeper</url>


Mime
View raw message