bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [3/4] bookkeeper git commit: BOOKKEEPER-1008: Netty 4.1
Date Mon, 15 May 2017 17:53:01 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index d2608e7..419e3aa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -21,26 +21,26 @@
 package org.apache.bookkeeper.proto;
 
 import com.google.protobuf.ByteString;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.bookkeeper.auth.AuthCallbacks;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
-
 import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.bookkeeper.client.ClientConnectionPeer;
@@ -49,7 +49,7 @@ import org.apache.bookkeeper.bookie.BookieConnectionPeer;
 class AuthHandler {
     static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
 
-    static class ServerSideHandler extends SimpleChannelHandler {
+    static class ServerSideHandler extends ChannelInboundHandlerAdapter {
         volatile boolean authenticated = false;
         final BookieAuthProvider.Factory authProviderFactory;
         final BookieConnectionPeer connectionPeer;
@@ -62,86 +62,82 @@ class AuthHandler {
         }
 
         @Override
-        public void channelOpen(ChannelHandlerContext ctx,
-                                ChannelStateEvent e) throws Exception {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             authProvider = authProviderFactory.newProvider(connectionPeer, new AuthHandshakeCompleteCallback());
-            super.channelOpen(ctx, e);
+            super.channelActive(ctx);
         }
 
         @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             if (authProvider != null) {
                 authProvider.close();
             }
-            super.channelClosed(ctx, e);
+            super.channelInactive(ctx);
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx,
-                                    MessageEvent e)
-                throws Exception {
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             if (authProvider == null) {
                 // close the channel, authProvider should only be
                 // null if the other end of line is an InetSocketAddress
                 // anything else is strange, and we don't want to deal
                 // with it
-                ctx.getChannel().close();
+                ctx.channel().close();
                 return;
             }
 
-            Object event = e.getMessage();
             if (authenticated) {
-                super.messageReceived(ctx, e);
-            } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client
-                BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
+                super.channelRead(ctx, msg);
+            } else if (msg instanceof BookieProtocol.AuthRequest) { // pre-PB-client
+                BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest) msg;
                 assert (req.getOpCode() == BookieProtocol.AUTH);
-                if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
+                if (checkAuthPlugin(req.getAuthMessage(), ctx.channel())) {
                     byte[] payload = req
                         .getAuthMessage()
                         .getPayload()
                         .toByteArray();
                     authProvider.process(AuthToken.wrap(payload),
-                                new AuthResponseCallbackLegacy(req, ctx.getChannel()));
+                                new AuthResponseCallbackLegacy(req, ctx.channel()));
                 } else {
-                    ctx.getChannel().close();
+                    ctx.channel().close();
                 }
-            } else if (event instanceof BookieProtocol.Request) {
-                BookieProtocol.Request req = (BookieProtocol.Request)event;
+            } else if (msg instanceof BookieProtocol.Request) {
+                BookieProtocol.Request req = (BookieProtocol.Request) msg;
                 if (req.getOpCode() == BookieProtocol.ADDENTRY) {
-                    ctx.getChannel().write(
+                    ctx.channel().writeAndFlush(
                             new BookieProtocol.AddResponse(
                                     req.getProtocolVersion(), BookieProtocol.EUA,
                                     req.getLedgerId(), req.getEntryId()));
                 } else if (req.getOpCode() == BookieProtocol.READENTRY) {
-                    ctx.getChannel().write(
+                    ctx.channel().writeAndFlush(
                             new BookieProtocol.ReadResponse(
                                     req.getProtocolVersion(), BookieProtocol.EUA,
                                     req.getLedgerId(), req.getEntryId()));
                 } else {
-                    ctx.getChannel().close();
+                    ctx.channel().close();
                 }
-            } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client
-                BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event;
+            } else if (msg instanceof BookkeeperProtocol.Request) { // post-PB-client
+                BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg;
                 if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
                         && req.hasAuthRequest()
-                        && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
+                        && checkAuthPlugin(req.getAuthRequest(), ctx.channel())) {
                     byte[] payload = req
                         .getAuthRequest()
                         .getPayload()
                         .toByteArray();
                     authProvider.process(AuthToken.wrap(payload),
-                                         new AuthResponseCallback(req, ctx.getChannel(), authProviderFactory.getPluginName()));
+                                         new AuthResponseCallback(req, ctx.channel(), authProviderFactory.getPluginName()));
                 } else {
                     BookkeeperProtocol.Response.Builder builder
                         = BookkeeperProtocol.Response.newBuilder()
                         .setHeader(req.getHeader())
                         .setStatus(BookkeeperProtocol.StatusCode.EUA);
 
-                    ctx.getChannel().write(builder.build());
+                    ctx.channel().writeAndFlush(builder.build());
                 }
             } else {
                 // close the channel, junk coming over it
-                ctx.getChannel().close();
+                ctx.channel().close();
             }
         }
 
@@ -177,7 +173,7 @@ class AuthHandler {
                         .setAuthPluginName(req.authMessage.getAuthPluginName())
                         .setPayload(ByteString.copyFrom(newam.getData()))
                         .build();
-                channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
+                channel.writeAndFlush(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
                                                               message));
             }
         }
@@ -202,7 +198,7 @@ class AuthHandler {
                     LOG.error("Error processing auth message, closing connection");
 
                     builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
-                    channel.write(builder.build());
+                    channel.writeAndFlush(builder.build());
                     channel.close();
                     return;
                 } else {
@@ -214,7 +210,7 @@ class AuthHandler {
                             .build();
                     builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
                         .setAuthResponse(message);
-                    channel.write(builder.build());
+                    channel.writeAndFlush(builder.build());
                 }
             }
         }
@@ -232,12 +228,12 @@ class AuthHandler {
         }
     }
 
-    static class ClientSideHandler extends SimpleChannelHandler {
+    static class ClientSideHandler extends ChannelDuplexHandler {
         volatile boolean authenticated = false;
         final ClientAuthProvider.Factory authProviderFactory;
         ClientAuthProvider authProvider;
         final AtomicLong transactionIdGenerator;
-        final Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+        final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
         final ClientConnectionPeer connectionPeer;
 
         ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
@@ -250,36 +246,30 @@ class AuthHandler {
         }
 
         @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                                     ChannelStateEvent e)
-                throws Exception {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             authProvider = authProviderFactory.newProvider(connectionPeer,
                         new AuthHandshakeCompleteCallback(ctx));
             authProvider.init(new AuthRequestCallback(ctx, authProviderFactory.getPluginName()));
 
-            super.channelConnected(ctx, e);
+            super.channelActive(ctx);
         }
 
         @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             if (authProvider != null) {
                 authProvider.close();
             }
-            super.channelClosed(ctx, e);
+            super.channelInactive(ctx);
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx,
-                                    MessageEvent e)
-                throws Exception {
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             assert (authProvider != null);
 
-            Object event = e.getMessage();
-
             if (authenticated) {
-                super.messageReceived(ctx, e);
-            } else if (event instanceof BookkeeperProtocol.Response) {
-                BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event;
+                super.channelRead(ctx, msg);
+            } else if (msg instanceof BookkeeperProtocol.Response) {
+                BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response) msg;
                 if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) {
                     if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
                         authenticationError(ctx, resp.getStatus().getNumber());
@@ -287,7 +277,7 @@ class AuthHandler {
                         assert (resp.hasAuthResponse());
                         BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
                         if (AuthProviderFactoryFactory.authenticationDisabledPluginName.equals(am.getAuthPluginName())){
-                            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+                            SocketAddress remote  = ctx.channel().remoteAddress();
                             LOG.info("Authentication is not enabled."
                                 + "Considering this client {0} authenticated", remote);
                             AuthHandshakeCompleteCallback authHandshakeCompleteCallback
@@ -307,28 +297,26 @@ class AuthHandler {
         }
 
         @Override
-        public void writeRequested(ChannelHandlerContext ctx,
-                                   MessageEvent e)
-                throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
             synchronized (this) {
                 if (authenticated) {
-                    super.writeRequested(ctx, e);
-                } else if (e.getMessage() instanceof BookkeeperProtocol.Request) {
+                    super.write(ctx, msg, promise);
+                } else if (msg instanceof BookkeeperProtocol.Request) {
                     // let auth messages through, queue the rest
-                    BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage();
+                    BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg;
                     if (req.getHeader().getOperation()
                             == BookkeeperProtocol.OperationType.AUTH) {
-                        super.writeRequested(ctx, e);
+                        super.write(ctx, msg, promise);
                     } else {
-                        waitingForAuth.add(e);
+                        waitingForAuth.add(msg);
                     }
-                } else if (e.getMessage() instanceof BookieProtocol.Request) {
+                } else if (msg instanceof BookieProtocol.Request) {
                     // let auth messages through, queue the rest
-                    BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage();
+                    BookieProtocol.Request req = (BookieProtocol.Request)msg;
                     if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
-                        super.writeRequested(ctx, e);
+                        super.write(ctx, msg, promise);
                     } else {
-                        waitingForAuth.add(e);
+                        waitingForAuth.add(msg);
                     }
                 } // else just drop
             }
@@ -340,9 +328,7 @@ class AuthHandler {
 
         void authenticationError(ChannelHandlerContext ctx, int errorCode) {
             LOG.error("Error processing auth message, erroring connection {}", errorCode);
-            ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(),
-                                     new AuthenticationException(
-                                             "Auth failed with error " + errorCode)));
+            ctx.fireExceptionCaught(new AuthenticationException("Auth failed with error " + errorCode));
         }
 
         class AuthRequestCallback implements AuthCallbacks.GenericCallback<AuthToken> {
@@ -351,7 +337,7 @@ class AuthHandler {
             String pluginName;
 
             AuthRequestCallback(ChannelHandlerContext ctx, String pluginName) {
-                this.channel = ctx.getChannel();
+                this.channel = ctx.channel();
                 this.ctx = ctx;
                 this.pluginName = pluginName;
             }
@@ -377,7 +363,7 @@ class AuthHandler {
                     .setHeader(header)
                     .setAuthRequest(message);
 
-                channel.write(builder.build());
+                channel.writeAndFlush(builder.build());
             }
         }
 
@@ -392,10 +378,10 @@ class AuthHandler {
                 if (rc == BKException.Code.OK) {
                     synchronized (this) {
                         authenticated = true;
-                        MessageEvent e = waitingForAuth.poll();
-                        while (e != null) {
-                            ctx.sendDownstream(e);
-                            e = waitingForAuth.poll();
+                        Object msg = waitingForAuth.poll();
+                        while (msg != null) {
+                            ctx.writeAndFlush(msg);
+                            msg = waitingForAuth.poll();
                         }
                     }
                 } else {
@@ -406,6 +392,7 @@ class AuthHandler {
         }
     }
 
+    @SuppressWarnings("serial")
     static class AuthenticationException extends IOException {
         AuthenticationException(String reason) {
             super(reason);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index ce85aef..4fb08e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -48,13 +48,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +55,14 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ExtensionRegistry;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
 /**
  * Implements the client-side part of the BookKeeper protocol.
  *
@@ -73,7 +74,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     AtomicLong totalBytesOutstanding = new AtomicLong();
 
     OrderedSafeExecutor executor;
-    ClientSocketChannelFactory channelFactory;
+    EventLoopGroup eventLoopGroup;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
             new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
     final HashedWheelTimer requestTimer;
@@ -89,15 +90,15 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     private final long bookieErrorThresholdPerInterval;
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
             OrderedSafeExecutor executor) throws IOException {
-        this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
+        this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE);
     }
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+    public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
                         OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException {
         this.conf = conf;
-        this.channelFactory = channelFactory;
+        this.eventLoopGroup = eventLoopGroup;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
@@ -141,7 +142,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
-        return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger,
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, requestTimer, statsLogger,
                 authProviderFactory, registry, pcbcPool);
     }
 
@@ -172,7 +173,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
     }
 
     public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long lac, final ChannelBuffer toSend, final WriteLacCallback cb, final Object ctx) {
+            final long lac, final ByteBuf toSend, final WriteLacCallback cb, final Object ctx) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, lac);
@@ -182,6 +183,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 return;
             }
 
+            toSend.retain();
             client.obtain(new GenericCallback<PerChannelBookieClient>() {
                 @Override
                 public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
@@ -196,19 +198,20 @@ public class BookieClient implements PerChannelBookieClientFactory {
                         } catch (RejectedExecutionException re) {
                             cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
                         }
-                        return;
+                    } else {
+                        pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
                     }
-                    pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+
+                    toSend.release();
                 }
-            });
+            }, ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
     }
 
     public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
-            final long entryId,
-            final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
+            final long entryId, final ByteBuf toSend, final WriteCallback cb, final Object ctx, final int options) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr, entryId);
@@ -218,6 +221,10 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 return;
             }
 
+            // Retain the buffer, since the connection could be obtained after the PendingApp might have already
+            // failed
+            toSend.retain();
+
             client.obtain(new GenericCallback<PerChannelBookieClient>() {
                 @Override
                 public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
@@ -233,11 +240,12 @@ public class BookieClient implements PerChannelBookieClientFactory {
                             cb.writeComplete(getRc(BKException.Code.InterruptedException),
                                     ledgerId, entryId, addr, ctx);
                         }
-                        return;
+                    } else {
+                        pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
                     }
-                    pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
+                    toSend.release();
                 }
-            });
+            }, ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -277,7 +285,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     }
                     pcbc.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx);
                 }
-            });
+            }, ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -310,7 +318,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     }
                     pcbc.readLac(ledgerId, cb, ctx);
                 }
-            });
+            }, ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -346,7 +354,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     }
                     pcbc.readEntry(ledgerId, entryId, cb, ctx);
                 }
-            });
+            }, ledgerId);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -379,7 +387,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     }
                     pcbc.getBookieInfo(requested, cb, ctx);
                 }
-            });
+            }, requested);
         } finally {
             closeLock.readLock().unlock();
         }
@@ -458,26 +466,21 @@ public class BookieClient implements PerChannelBookieClientFactory {
         Counter counter = new Counter();
         byte hello[] = "hello".getBytes(UTF_8);
         long ledger = Long.parseLong(args[2]);
-        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-        ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(tfb.setNameFormat(
-                        "BookKeeper-NIOBoss-%d").build()),
-                Executors.newCachedThreadPool(tfb.setNameFormat(
-                        "BookKeeper-NIOWorker-%d").build()));
+        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
         OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
                 .name("BookieClientWorker")
                 .numThreads(1)
                 .build();
-        BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
+        BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer(hello), cb, counter, 0);
+            bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
-        channelFactory.releaseExternalResources();
+        eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 2c6dd3a..cf7d419 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -21,36 +21,56 @@
 package org.apache.bookkeeper.proto;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.google.protobuf.ExtensionRegistry;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.local.LocalServerChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
 import com.google.common.annotations.VisibleForTesting;
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.apache.bookkeeper.bookie.BookieConnectionPeer;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 
 /**
  * Netty server for serving bookie requests
@@ -61,12 +81,14 @@ class BookieNettyServer {
 
     final int maxFrameSize;
     final ServerConfiguration conf;
-    final List<ChannelManager> channels = new ArrayList<>();
+    final EventLoopGroup eventLoopGroup;
+    final EventLoopGroup jvmEventLoopGroup;
     final RequestProcessor requestProcessor;
-    final ChannelGroup allChannels = new CleanupChannelGroup();
     final AtomicBoolean isRunning = new AtomicBoolean(false);
     final Object suspensionLock = new Object();
-    boolean suspended = false;
+    volatile boolean suspended = false;
+    ChannelGroup allChannels;
+    final BookieSocketAddress bookieAddress;
 
     final BookieAuthProvider.Factory authProviderFactory;
     final BookieProtoEncoding.ResponseEncoder responseEncoder;
@@ -85,24 +107,42 @@ class BookieNettyServer {
         requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
 
         if (!conf.isDisableServerSocketBind()) {
-            channels.add(new NioServerSocketChannelManager());
+            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("bookie-io-%s").build();
+            final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
+
+            EventLoopGroup eventLoopGroup;
+            if (SystemUtils.IS_OS_LINUX) {
+                try {
+                    eventLoopGroup = new EpollEventLoopGroup(numThreads, threadFactory);
+                } catch (ExceptionInInitializerError | NoClassDefFoundError | UnsatisfiedLinkError e) {
+                    LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", e.getMessage());
+                    eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
+                }
+            } else {
+                eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
+            }
+
+            this.eventLoopGroup = eventLoopGroup;
+            allChannels = new CleanupChannelGroup(eventLoopGroup);
+        } else {
+            this.eventLoopGroup = null;
         }
+
         if (conf.isEnableLocalTransport()) {
-            channels.add(new VMLocalChannelManager());
+            jvmEventLoopGroup = new DefaultEventLoopGroup();
+            allChannels = new CleanupChannelGroup(jvmEventLoopGroup);
+        } else {
+            jvmEventLoopGroup = null;
         }
-        try {
-            for (ChannelManager channel : channels) {
-                Channel nettyChannel = channel.start(conf, new BookiePipelineFactory());
-                allChannels.add(nettyChannel);
-            }
-        } catch (IOException bindError) {
-            // clean up all the channels, if this constructor throws an exception the caller code will
-            // not be able to call close(), leading to a resource leak 
-            for (ChannelManager channel : channels) {
-                channel.close();
-            }
-            throw bindError;
+
+        bookieAddress = Bookie.getBookieAddress(conf);
+        InetSocketAddress bindAddress;
+        if (conf.getListeningInterface() == null) {
+            bindAddress = new InetSocketAddress(conf.getBookiePort());
+        } else {
+            bindAddress = bookieAddress.getSocketAddress();
         }
+        listenOn(bindAddress, bookieAddress);
     }
 
     boolean isRunning() {
@@ -113,7 +153,19 @@ class BookieNettyServer {
     void suspendProcessing() {
         synchronized (suspensionLock) {
             suspended = true;
-            allChannels.setReadable(false).awaitUninterruptibly();
+            for (Channel channel : allChannels) {
+                // To suspend processing in the bookie, submit a task
+                // that keeps the event loop busy until resume is
+                // explicitely invoked
+                channel.eventLoop().submit(() -> {
+                    while (suspended && isRunning()) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                });
+            }
         }
     }
 
@@ -121,11 +173,111 @@ class BookieNettyServer {
     void resumeProcessing() {
         synchronized (suspensionLock) {
             suspended = false;
-            allChannels.setReadable(true).awaitUninterruptibly();
+            for (Channel channel : allChannels) {
+                channel.config().setAutoRead(true);
+            }
             suspensionLock.notifyAll();
         }
     }
 
+    private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
+        if (!conf.isDisableServerSocketBind()) {
+            ServerBootstrap bootstrap = new ServerBootstrap();
+            bootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
+            bootstrap.group(eventLoopGroup, eventLoopGroup);
+            bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
+            bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
+            bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                    new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
+                            conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+
+            if (eventLoopGroup instanceof EpollEventLoopGroup) {
+                bootstrap.channel(EpollServerSocketChannel.class);
+            } else {
+                bootstrap.channel(NioServerSocketChannel.class);
+            }
+
+            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    synchronized (suspensionLock) {
+                        while (suspended) {
+                            suspensionLock.wait();
+                        }
+                    }
+
+                    BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler();
+                    ChannelPipeline pipeline = ch.pipeline();
+
+                    pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
+                    pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+
+                    pipeline.addLast("bookieProtoDecoder", requestDecoder);
+                    pipeline.addLast("bookieProtoEncoder", responseEncoder);
+                    pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory));
+
+                    ChannelInboundHandler requestHandler = isRunning.get()
+                            ? new BookieRequestHandler(conf, requestProcessor, allChannels) : new RejectRequestHandler();
+                    pipeline.addLast("bookieRequestHandler", requestHandler);
+
+                }
+            });
+
+            // Bind and start to accept incoming connections
+            bootstrap.bind(address.getAddress(), address.getPort()).sync();
+        }
+
+        if (conf.isEnableLocalTransport()) {
+            ServerBootstrap jvmBootstrap = new ServerBootstrap();
+            jvmBootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
+            jvmBootstrap.group(jvmEventLoopGroup, jvmEventLoopGroup);
+            jvmBootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
+            jvmBootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive());
+            jvmBootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
+            jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                    new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
+                            conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
+
+            if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
+                jvmBootstrap.channel(LocalServerChannel.class);
+            } else if (jvmEventLoopGroup instanceof EpollEventLoopGroup) {
+                jvmBootstrap.channel(EpollServerSocketChannel.class);
+            } else {
+                jvmBootstrap.channel(NioServerSocketChannel.class);
+            }
+
+            jvmBootstrap.childHandler(new ChannelInitializer<LocalChannel>() {
+                @Override
+                protected void initChannel(LocalChannel ch) throws Exception {
+                    synchronized (suspensionLock) {
+                        while (suspended) {
+                            suspensionLock.wait();
+                        }
+                    }
+
+                    BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler();
+                    ChannelPipeline pipeline = ch.pipeline();
+
+                    pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
+                    pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+
+                    pipeline.addLast("bookieProtoDecoder", requestDecoder);
+                    pipeline.addLast("bookieProtoEncoder", responseEncoder);
+                    pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory));
+
+                    ChannelInboundHandler requestHandler = isRunning.get()
+                            ? new BookieRequestHandler(conf, requestProcessor, allChannels) : new RejectRequestHandler();
+                    pipeline.addLast("bookieRequestHandler", requestHandler);
+
+                }
+            });
+
+            // use the same address 'name', so clients can find local Bookie still discovering them using ZK
+            jvmBootstrap.bind(bookieAddress.getLocalAddress()).sync();
+            LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
+        }
+    }
+
     void start() {
         isRunning.set(true);
     }
@@ -134,13 +286,23 @@ class BookieNettyServer {
         LOG.info("Shutting down BookieNettyServer");
         isRunning.set(false);
         allChannels.close().awaitUninterruptibly();
-        for (ChannelManager channel : channels) {
-            channel.close();
+
+        if (eventLoopGroup != null) {
+            try {
+                eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.MILLISECONDS).await();
+            } catch (InterruptedException e) {
+                /// OK
+            }
+        }
+        if (jvmEventLoopGroup != null) {
+            LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
+            jvmEventLoopGroup.shutdownGracefully();
         }
+
         authProviderFactory.close();
     }
 
-    class BookieSideConnectionPeerContextHandler extends SimpleChannelHandler {
+    class BookieSideConnectionPeerContextHandler extends ChannelInboundHandlerAdapter {
 
         final BookieConnectionPeer connectionPeer;
         volatile Channel channel;
@@ -152,7 +314,7 @@ class BookieNettyServer {
                 public SocketAddress getRemoteAddr() {
                     Channel c = channel;
                     if (c != null) {
-                        return c.getRemoteAddress();
+                        return c.remoteAddress();
                     } else {
                         return null;
                     }
@@ -191,56 +353,25 @@ class BookieNettyServer {
         }
 
         @Override
-        public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            channel = ctx.getChannel();
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            channel = ctx.channel();
         }
 
     }
 
-    class BookiePipelineFactory implements ChannelPipelineFactory {
-
-        public ChannelPipeline getPipeline() throws Exception {
-            synchronized (suspensionLock) {
-                while (suspended) {
-                    suspensionLock.wait();
-                }
-            }
-            BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler();
-            ChannelPipeline pipeline = Channels.pipeline();
-            pipeline.addLast("lengthbaseddecoder",
-                new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
-            pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-
-            pipeline.addLast("bookieProtoDecoder", requestDecoder);
-            pipeline.addLast("bookieProtoEncoder", responseEncoder);
-            pipeline.addLast("bookieAuthHandler",
-                new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory));
-
-            SimpleChannelHandler requestHandler = isRunning.get()
-                ? new BookieRequestHandler(conf, requestProcessor, allChannels)
-                : new RejectRequestHandler();
-
-            pipeline.addLast("bookieRequestHandler", requestHandler);
-            pipeline.addLast("contextHandler", contextHandler);
-            return pipeline;
-        }
-    }
-
-    private static class RejectRequestHandler extends SimpleChannelHandler {
-
+    private static class RejectRequestHandler extends ChannelInboundHandlerAdapter {
         @Override
-        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            ctx.getChannel().close();
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            ctx.channel().close();
         }
-
     }
 
     private static class CleanupChannelGroup extends DefaultChannelGroup {
 
         private AtomicBoolean closed = new AtomicBoolean(false);
 
-        CleanupChannelGroup() {
-            super("BookieChannelGroup");
+        public CleanupChannelGroup(EventLoopGroup eventLoopGroup) {
+            super("BookieChannelGroup", eventLoopGroup.next());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 683a6fb..148b31d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,23 +20,30 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.io.IOException;
+import java.util.List;
 
-import org.jboss.netty.buffer.ChannelBufferFactory;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+import org.apache.bookkeeper.util.DoubleByteBuf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageLite;
+
 public class BookieProtoEncoding {
     private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
@@ -50,7 +57,7 @@ public class BookieProtoEncoding {
          * @return encode buffer.
          * @throws Exception
          */
-        public Object encode(Object object, ChannelBufferFactory factory) throws Exception;
+        public Object encode(Object object, ByteBufAllocator allocator) throws Exception;
 
         /**
          * Decode a <i>packet</i> into an object.
@@ -60,7 +67,7 @@ public class BookieProtoEncoding {
          * @return parsed object.
          * @throws Exception
          */
-        public Object decode(ChannelBuffer packet) throws Exception;
+        public Object decode(ByteBuf packet) throws Exception;
 
     }
 
@@ -72,7 +79,7 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        public Object encode(Object msg, ChannelBufferFactory bufferFactory)
+        public Object encode(Object msg, ByteBufAllocator allocator)
                 throws Exception {
             if (!(msg instanceof BookieProtocol.Request)) {
                 return msg;
@@ -82,10 +89,10 @@ public class BookieProtoEncoding {
                 BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
                 int totalHeaderSize = 4 // for the header
                     + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
-                ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
+                ByteBuf buf = allocator.buffer(totalHeaderSize);
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
-                return ChannelBuffers.wrappedBuffer(buf, ar.getData());
+                return DoubleByteBuf.get(buf, ar.getData());
             } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
@@ -94,7 +101,7 @@ public class BookieProtoEncoding {
                     totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
                 }
 
-                ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
+                ByteBuf buf = allocator.buffer(totalHeaderSize);
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeLong(r.getLedgerId());
                 buf.writeLong(r.getEntryId());
@@ -107,11 +114,11 @@ public class BookieProtoEncoding {
                 BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage();
                 int totalHeaderSize = 4; // for request type
                 int totalSize = totalHeaderSize + am.getSerializedSize();
-                ChannelBuffer buf = bufferFactory.getBuffer(totalSize);
+                ByteBuf buf = allocator.buffer(totalSize);
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(),
                                               r.getOpCode(),
                                               r.getFlags()).toInt());
-                ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf);
+                ByteBufOutputStream bufStream = new ByteBufOutputStream(buf);
                 am.writeTo(bufStream);
                 return buf;
             } else {
@@ -120,7 +127,7 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        public Object decode(ChannelBuffer packet)
+        public Object decode(ByteBuf packet)
                 throws Exception {
             PacketHeader h = PacketHeader.fromInt(packet.readInt());
 
@@ -138,12 +145,12 @@ public class BookieProtoEncoding {
                 masterKey = new byte[BookieProtocol.MASTER_KEY_LENGTH];
                 packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
 
-                ChannelBuffer bb = packet.duplicate();
-
-                ledgerId = bb.readLong();
-                entryId = bb.readLong();
-                return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId,
-                        flags, masterKey, packet.slice());
+                // Read ledger and entry id without advancing the reader index
+                packet.markReaderIndex();
+                ledgerId = packet.readLong();
+                entryId = packet.readLong();
+                packet.resetReaderIndex();
+                return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, flags, masterKey, packet.retain());
             case BookieProtocol.READENTRY:
                 ledgerId = packet.readLong();
                 entryId = packet.readLong();
@@ -159,7 +166,7 @@ public class BookieProtoEncoding {
             case BookieProtocol.AUTH:
                 BookkeeperProtocol.AuthMessage.Builder builder
                     = BookkeeperProtocol.AuthMessage.newBuilder();
-                builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry);
+                builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry);
                 return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
             }
             return packet;
@@ -174,13 +181,13 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        public Object encode(Object msg, ChannelBufferFactory bufferFactory)
+        public Object encode(Object msg, ByteBufAllocator allocator)
                 throws Exception {
             if (!(msg instanceof BookieProtocol.Response)) {
                 return msg;
             }
             BookieProtocol.Response r = (BookieProtocol.Response)msg;
-            ChannelBuffer buf = bufferFactory.getBuffer(24);
+            ByteBuf buf = allocator.buffer(24);
             buf.writeInt(new PacketHeader(r.getProtocolVersion(),
                                           r.getOpCode(), (short)0).toInt());
 
@@ -192,8 +199,7 @@ public class BookieProtoEncoding {
 
                 BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
                 if (rr.hasData()) {
-                    return ChannelBuffers.wrappedBuffer(buf,
-                            ChannelBuffers.wrappedBuffer(rr.getData()));
+                    return DoubleByteBuf.get(buf, rr.getData());
                 } else {
                     return buf;
                 }
@@ -205,15 +211,14 @@ public class BookieProtoEncoding {
                 return buf;
             } else if (msg instanceof BookieProtocol.AuthResponse) {
                 BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
-                return ChannelBuffers.wrappedBuffer(buf,
-                        ChannelBuffers.wrappedBuffer(am.toByteArray()));
+                return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
             } else {
                 LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
                 return msg;
             }
         }
         @Override
-        public Object decode(ChannelBuffer buffer)
+        public Object decode(ByteBuf buffer)
                 throws Exception {
             int rc;
             long ledgerId, entryId;
@@ -240,7 +245,7 @@ public class BookieProtoEncoding {
                                                            ledgerId, entryId);
                 }
             case BookieProtocol.AUTH:
-                ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer);
+                ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
                 BookkeeperProtocol.AuthMessage.Builder builder
                     = BookkeeperProtocol.AuthMessage.newBuilder();
                 builder.mergeFrom(bufStream, extensionRegistry);
@@ -260,15 +265,14 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet),
-                                                        extensionRegistry);
+        public Object decode(ByteBuf packet) throws Exception {
+            return BookkeeperProtocol.Request.parseFrom(new ByteBufInputStream(packet), extensionRegistry);
         }
 
         @Override
-        public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+        public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {
             BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg;
-            return ChannelBuffers.wrappedBuffer(request.toByteArray());
+            return serializeProtobuf(request, allocator);
         }
 
     }
@@ -281,20 +285,37 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet),
+        public Object decode(ByteBuf packet) throws Exception {
+            return BookkeeperProtocol.Response.parseFrom(new ByteBufInputStream(packet),
                                                          extensionRegistry);
         }
 
         @Override
-        public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+        public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {
             BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg;
-            return ChannelBuffers.wrappedBuffer(response.toByteArray());
+            return serializeProtobuf(response, allocator);
+        }
+
+    }
+
+    private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator allocator) {
+        int size = msg.getSerializedSize();
+        ByteBuf buf = allocator.heapBuffer(size, size);
+
+        try {
+            msg.writeTo(CodedOutputStream.newInstance(buf.array(), buf.arrayOffset() + buf.writerIndex(), size));
+        } catch (IOException e) {
+            // This is in-memory serialization, should not fail
+            throw new RuntimeException(e);
         }
 
+        // Advance writer idx
+        buf.writerIndex(buf.capacity());
+        return buf;
     }
 
-    public static class RequestEncoder extends OneToOneEncoder {
+    @Sharable
+    public static class RequestEncoder extends MessageToMessageEncoder<Object> {
 
         final EnDecoder REQ_PREV3;
         final EnDecoder REQ_V3;
@@ -305,23 +326,23 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
-                throws Exception {
+        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Encode request {} to channel {}.", msg, channel);
+                LOG.debug("Encode request {} to channel {}.", msg, ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Request) {
-                return REQ_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+                out.add(REQ_V3.encode(msg, ctx.alloc()));
             } else if (msg instanceof BookieProtocol.Request) {
-                return REQ_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+                out.add(REQ_PREV3.encode(msg, ctx.alloc()));
             } else {
-                LOG.error("Invalid request to encode to {}: {}", channel, msg.getClass().getName());
-                return msg;
+                LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName());
+                out.add(msg);
             }
         }
     }
 
-    public static class RequestDecoder extends OneToOneDecoder {
+    @Sharable
+    public static class RequestDecoder extends MessageToMessageDecoder<Object> {
         final EnDecoder REQ_PREV3;
         final EnDecoder REQ_V3;
 
@@ -331,31 +352,32 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
-                throws Exception {
+        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Received request {} from channel {} to decode.", msg, channel);
+                LOG.debug("Received request {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ChannelBuffer)) {
-                return msg;
+            if (!(msg instanceof ByteBuf)) {
+                out.add(msg);
+                return;
             }
-            ChannelBuffer buffer = (ChannelBuffer) msg;
+            ByteBuf buffer = (ByteBuf) msg;
             try {
                 buffer.markReaderIndex();
                 try {
-                    return REQ_V3.decode(buffer);
+                    out.add(REQ_V3.decode(buffer));
                 } catch (InvalidProtocolBufferException e) {
                     buffer.resetReaderIndex();
-                    return REQ_PREV3.decode(buffer);
+                    out.add(REQ_PREV3.decode(buffer));
                 }
             } catch (Exception e) {
-                LOG.error("Failed to decode a request from {} : ", channel, e);
-                throw e;
+                LOG.error("Failed to decode a request from {} : ", ctx.channel(), e);
+                ctx.close();
             }
         }
     }
 
-    public static class ResponseEncoder extends OneToOneEncoder {
+    @Sharable
+    public static class ResponseEncoder extends MessageToMessageEncoder<Object> {
         final EnDecoder REP_PREV3;
         final EnDecoder REP_V3;
 
@@ -365,23 +387,24 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
                 throws Exception {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Encode response {} to channel {}.", msg, channel);
+                LOG.debug("Encode response {} to channel {}.", msg, ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Response) {
-                return REP_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+                out.add(REP_V3.encode(msg, ctx.alloc()));
             } else if (msg instanceof BookieProtocol.Response) {
-                return REP_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+                out.add(REP_PREV3.encode(msg, ctx.alloc()));
             } else {
-                LOG.error("Invalid response to encode to {}: {}", channel, msg.getClass().getName());
-                return msg;
+                LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName());
+                out.add(msg);
             }
         }
     }
 
-    public static class ResponseDecoder extends OneToOneDecoder {
+    @Sharable
+    public static class ResponseDecoder extends MessageToMessageDecoder<Object> {
         final EnDecoder REP_PREV3;
         final EnDecoder REP_V3;
 
@@ -391,26 +414,25 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
-                throws Exception {
+        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Received response {} from channel {} to decode.", msg, channel);
+                LOG.debug("Received response {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ChannelBuffer)) {
-                return msg;
+            if (!(msg instanceof ByteBuf)) {
+                out.add(msg);
             }
-            ChannelBuffer buffer = (ChannelBuffer) msg;
+            ByteBuf buffer = (ByteBuf) msg;
             try {
                 buffer.markReaderIndex();
                 try {
-                    return REP_V3.decode(buffer);
+                    out.add(REP_V3.decode(buffer));
                 } catch (InvalidProtocolBufferException e) {
                     buffer.resetReaderIndex();
-                    return REP_PREV3.decode(buffer);
+                    out.add(REP_PREV3.decode(buffer));
                 }
             } catch (Exception e) {
-                LOG.error("Failed to decode a response from channel {} : ", channel, e);
-                throw e;
+                LOG.error("Failed to decode a response from channel {} : ", ctx.channel(), e);
+                ctx.close();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 1191d3c..094daab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -21,7 +21,8 @@ package org.apache.bookkeeper.proto;
  *
  */
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
+
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
@@ -249,25 +250,29 @@ public interface BookieProtocol {
     }
 
     static class AddRequest extends Request {
-        final ChannelBuffer data;
+        final ByteBuf data;
 
         AddRequest(byte protocolVersion, long ledgerId, long entryId,
-                   short flags, byte[] masterKey, ChannelBuffer data) {
+                   short flags, byte[] masterKey, ByteBuf data) {
             super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey);
-            this.data = data;
+            this.data = data.retain();
         }
 
-        ChannelBuffer getData() {
+        ByteBuf getData() {
             return data;
         }
 
         ByteBuffer getDataAsByteBuffer() {
-            return data.toByteBuffer().slice();
+            return data.nioBuffer().slice();
         }
 
         boolean isRecoveryAdd() {
             return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
         }
+
+        void release() {
+            data.release();
+        }
     }
 
     static class ReadRequest extends Request {
@@ -342,14 +347,14 @@ public interface BookieProtocol {
     }
 
     static class ReadResponse extends Response {
-        final ChannelBuffer data;
+        final ByteBuf data;
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
             super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
             this.data = null;
         }
 
-        ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ChannelBuffer data) {
+        ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
             super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
             this.data = data;
         }
@@ -358,7 +363,7 @@ public interface BookieProtocol {
             return data != null;
         }
 
-        ChannelBuffer getData() {
+        ByteBuf getData() {
             return data;
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index 3c5f128..b1bc081 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -20,23 +20,22 @@
  */
 package org.apache.bookkeeper.proto;
 
+
+import java.nio.channels.ClosedChannelException;
+
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.channels.ClosedChannelException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
 
 /**
  * Serverside handler for bookkeeper requests
  */
-class BookieRequestHandler extends SimpleChannelHandler {
+class BookieRequestHandler extends ChannelInboundHandlerAdapter {
 
     private final static Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
     private final RequestProcessor requestProcessor;
@@ -48,42 +47,36 @@ class BookieRequestHandler extends SimpleChannelHandler {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx,
-                            ChannelStateEvent e)
-            throws Exception {
-        allChannels.add(ctx.getChannel());
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        LOG.info("Channel connected: {}", ctx.channel());
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-        Throwable throwable = e.getCause();
-        if (throwable instanceof ClosedChannelException) {
-            LOG.debug("Client died before request could be completed", throwable);
-            return;
-        }
-        LOG.error("Unhandled exception occurred in I/O thread or handler", throwable);
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        allChannels.add(ctx.channel());
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception {
-        LOG.debug("Channel connected {}", e);
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        LOG.info("Channels disconnected: {}", ctx.channel());
     }
 
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception {
-        LOG.debug("Channel disconnected {}", e);
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        if (cause instanceof ClosedChannelException) {
+            LOG.info("Client died before request could be completed", cause);
+            return;
+        }
+        LOG.error("Unhandled exception occurred in I/O thread or handler", cause);
+        ctx.close();
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-        Object event = e.getMessage();
-        if (!(event instanceof BookkeeperProtocol.Request || event instanceof BookieProtocol.Request)) {
-            ctx.sendUpstream(e);
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (!(msg instanceof BookkeeperProtocol.Request || msg instanceof BookieProtocol.Request)) {
+            ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(event, ctx.getChannel());
+        requestProcessor.processRequest(msg, ctx.channel());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 38f40f8..0746686 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,10 +20,10 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+
+import io.netty.channel.Channel;
+
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 
@@ -33,7 +33,6 @@ import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,7 +131,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     processReadRequestV3(r, c);
                     break;
                 case AUTH:
-                    LOG.info("Ignoring auth operation from client {}",c.getRemoteAddress());
+                    LOG.info("Ignoring auth operation from client {}",c.remoteAddress());
                     BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
                         .newBuilder()
                         .setAuthPluginName(AuthProviderFactoryFactory.authenticationDisabledPluginName)
@@ -142,7 +141,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                             BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
                             .setStatus(BookkeeperProtocol.StatusCode.EOK)
                             .setAuthResponse(message);
-                    c.write(authResponse.build());
+                    c.writeAndFlush(authResponse.build());
                     break;
                 case WRITE_LAC:
                     processWriteLacRequestV3(r,c);
@@ -158,7 +157,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     BookkeeperProtocol.Response.Builder response =
                             BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
                             .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
-                    c.write(response.build());
+                    c.writeAndFlush(response.build());
                     if (statsEnabled) {
                         bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                     }
@@ -176,7 +175,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     break;
                 default:
                     LOG.error("Unknown op type {}, sending error", r.getOpCode());
-                    c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+                    c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
                     if (statsEnabled) {
                         bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
                     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index b2d6d82..58fd451 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,7 +30,6 @@ import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.zookeeper.AsyncCallback;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +68,7 @@ public class BookkeeperInternalCallbacks {
     }
 
     public interface ReadLacCallback {
-        void readLacComplete(int rc, long ledgerId, ChannelBuffer lac, ChannelBuffer buffer, Object ctx);
+        void readLacComplete(int rc, long ledgerId, ByteBuf lac, ByteBuf buffer, Object ctx);
     }
 
     public interface WriteLacCallback {
@@ -86,7 +87,7 @@ public class BookkeeperInternalCallbacks {
      */
 
     public interface ReadEntryCallback {
-        void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx);
+        void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx);
     }
 
     public interface GetBookieInfoCallback {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
deleted file mode 100644
index 774086c..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.proto;
-
-import java.io.IOException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-
-/**
- * Manages the lifycycle of a communication Channel
- * @author enrico.olivelli
- */
-public abstract class ChannelManager {
-
-    /**
-     * Boots the Channel
-     * @param conf Bookie Configuration
-     * @param channelPipelineFactory Netty Pipeline Factory
-     * @param bookieAddress The actual address to listen on
-     * @return the channel which is listening for incoming connections
-     * @throws IOException 
-     */
-    public abstract Channel start(ServerConfiguration conf, ChannelPipelineFactory channelPipelineFactory) throws IOException;
-
-    /**
-     * Releases all resources
-     */
-    public abstract void close();
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 2658634..36a26ef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.base.Preconditions;
 
@@ -71,12 +72,12 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool,
     }
 
     @Override
-    public void obtain(GenericCallback<PerChannelBookieClient> callback) {
+    public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
         if (1 == clients.length) {
             clients[0].connectIfNeededAndDoOp(callback);
             return;
         }
-        int idx = MathUtils.signSafeMod(counter.getAndIncrement(), clients.length);
+        int idx = MathUtils.signSafeMod(key, clients.length);
         clients[idx].connectIfNeededAndDoOp(callback);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 88c5eb1..1872701 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -28,10 +28,11 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.Channel;
+
 public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private final static Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
deleted file mode 100644
index 91fde2c..0000000
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.proto;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-/**
- * Manages a NioServerSocketChannel channel
- *
- * @author enrico.olivelli
- */
-public class NioServerSocketChannelManager extends ChannelManager {
-
-    private ChannelFactory channelFactory;
-
-    @Override
-    public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory) throws IOException {
-        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
-        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-        String base = "bookie-" + conf.getBookiePort() + "-netty";
-        this.channelFactory = new NioServerSocketChannelFactory(
-                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
-                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
-
-        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
-        bootstrap.setPipelineFactory(bookiePipelineFactory);
-        bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
-        bootstrap.setOption("child.soLinger", 2);
-
-        InetSocketAddress bindAddress;
-        if (conf.getListeningInterface() == null) {
-            // listen on all interfaces
-            bindAddress = new InetSocketAddress(conf.getBookiePort());
-        } else {
-            bindAddress = bookieAddress.getSocketAddress();
-        }
-        
-        Channel listen = bootstrap.bind(bindAddress);
-        return listen;
-    }
-
-    @Override
-    public void close() {
-        if (channelFactory != null) {
-            channelFactory.releaseExternalResources();
-        }
-        channelFactory = null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 681f6c6..4f14dcf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -23,10 +23,11 @@ import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.Channel;
+
 abstract class PacketProcessorBase extends SafeRunnable {
     private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
     final Request request;
@@ -55,7 +56,7 @@ abstract class PacketProcessorBase extends SafeRunnable {
     }
 
     protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
-        channel.write(response);
+        channel.writeAndFlush(response, channel.voidPromise());
         if (BookieProtocol.EOK == rc) {
             statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
         } else {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 85ec6cb..873ef30 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.channel.Channel;
+
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -29,7 +31,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.channel.Channel;
 
 public abstract class PacketProcessorBaseV3 extends SafeRunnable {
 
@@ -47,7 +48,7 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable {
     }
 
     protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) {
-        channel.write(response);
+        channel.writeAndFlush(response);
         if (StatusCode.EOK == code) {
             statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
         } else {


Mime
View raw message