bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-930: Option to disable Bookie networking
Date Sat, 10 Sep 2016 18:27:49 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 28f23e80b -> 9db51b8d5


BOOKKEEPER-930: Option to disable Bookie networking

Author: eolivelli <eolivelli@gmail.com>

Reviewers: sijie@apache.org <sijie@apache.org>

Closes #49 from eolivelli/BOOKKEEPER-930


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9db51b8d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9db51b8d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9db51b8d

Branch: refs/heads/master
Commit: 9db51b8d532d18485798d9dd96973c22450a0495
Parents: 28f23e8
Author: eolivelli <eolivelli@gmail.com>
Authored: Sat Sep 10 11:27:43 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Sat Sep 10 11:27:43 2016 -0700

----------------------------------------------------------------------
 .../bookkeeper/conf/ServerConfiguration.java    | 25 ++++-
 .../bookkeeper/proto/BookieNettyServer.java     | 98 +++++++-------------
 .../apache/bookkeeper/proto/ChannelManager.java | 44 +++++++++
 .../bookkeeper/proto/LocalBookiesRegistry.java  |  4 +-
 .../proto/NioServerSocketChannelManager.java    | 73 +++++++++++++++
 .../proto/PerChannelBookieClient.java           |  4 +-
 .../bookkeeper/proto/VMLocalChannelManager.java | 61 ++++++++++++
 .../bookkeeper/proto/NetworkLessBookieTest.java | 79 ++++++++++++++++
 8 files changed, 318 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 7d9b697..67e81cc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -114,6 +114,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     // registration.
     protected final static String USE_HOST_NAME_AS_BOOKIE_ID = "useHostNameAsBookieID";
     protected final static String ENABLE_LOCAL_TRANSPORT = "enableLocalTransport";
+    protected final static String DISABLE_SERVER_SOCKET_BIND = "disableServerSocketBind";
 
     protected final static String SORTED_LEDGER_STORAGE_ENABLED = "sortedLedgerStorageEnabled";
     protected final static String SKIP_LIST_SIZE_LIMIT = "skipListSizeLimit";
@@ -1561,7 +1562,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
-     * Get hwhether to use listen for local JVM clients. Defaults to false.
+     * Get whether to listen for local JVM clients. Defaults to false.
      *
      * @return true, then bookie will be listen for local JVM clients
      */
@@ -1582,6 +1583,28 @@ public class ServerConfiguration extends AbstractConfiguration {
         return this;
     }
 
+    /**
+     * Get whether to disable bind of server-side sockets. Defaults to false.
+     *
+     * @return true, then bookie will not listen for network connections
+     */
+    public boolean isDisableServerSocketBind() {
+        return getBoolean(DISABLE_SERVER_SOCKET_BIND, false);
+    }
+
+    /**
+     * Configure the bookie to disable bind on network interfaces,
+     * this bookie will be available only to BookKeeper clients executed on the local JVM
+     *
+     * @see #getDisableServerSocketBind
+     * @param disableServerSocketBind
+     *            whether to disable binding on network interfaces
+     * @return server configuration
+     */
+    public ServerConfiguration setDisableServerSocketBind(boolean disableServerSocketBind)
{
+        setProperty(DISABLE_SERVER_SOCKET_BIND, disableServerSocketBind);
+        return this;
+    }
 
     /**
      * Get the stats provider used by bookie.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 202a5e5..5fcc64e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -21,20 +21,14 @@
 package org.apache.bookkeeper.proto;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -44,18 +38,14 @@ import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.protobuf.ExtensionRegistry;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
-import org.jboss.netty.channel.local.LocalAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Netty server for serving bookie requests
@@ -65,21 +55,19 @@ class BookieNettyServer {
 
     final static int maxMessageSize = 0xfffff;
     final ServerConfiguration conf;
-    final ChannelFactory serverChannelFactory;
-    final ChannelFactory jvmServerChannelFactory;
+    final List<ChannelManager> channels = new ArrayList<>();
     final RequestProcessor requestProcessor;
     final ChannelGroup allChannels = new CleanupChannelGroup();
     final AtomicBoolean isRunning = new AtomicBoolean(false);
-    Object suspensionLock = new Object();
+    final Object suspensionLock = new Object();
     boolean suspended = false;
-    final BookieSocketAddress bookieAddress;
 
     final BookieAuthProvider.Factory authProviderFactory;
     final BookieProtoEncoding.ResponseEncoder responseEncoder;
     final BookieProtoEncoding.RequestDecoder requestDecoder;
 
     BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
-            throws IOException, KeeperException, InterruptedException, BookieException  {
+            throws IOException, KeeperException, InterruptedException, BookieException {
         this.conf = conf;
         this.requestProcessor = processor;
 
@@ -89,25 +77,25 @@ class BookieNettyServer {
         responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
         requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
 
-        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-        String base = "bookie-" + conf.getBookiePort() + "-netty";
-        serverChannelFactory = new NioServerSocketChannelFactory(
-                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
-                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+        if (!conf.isDisableServerSocketBind()) {
+            channels.add(new NioServerSocketChannelManager());
+        }
         if (conf.isEnableLocalTransport()) {
-            jvmServerChannelFactory = new DefaultLocalServerChannelFactory();
-        } else {
-            jvmServerChannelFactory = null;
+            channels.add(new VMLocalChannelManager());
         }
-        bookieAddress = Bookie.getBookieAddress(conf);
-        InetSocketAddress bindAddress;
-        if (conf.getListeningInterface() == null) {
-            // listen on all interfaces
-            bindAddress = new InetSocketAddress(conf.getBookiePort());
-        } else {
-            bindAddress = bookieAddress.getSocketAddress();
+        try {
+            for (ChannelManager channel : channels) {
+                Channel nettyChannel = channel.start(conf, new BookiePipelineFactory());
+                allChannels.add(nettyChannel);
+            }
+        } catch (IOException bindError) {
+            // clean up all the channels, if this constructor throws an exception the caller
code will
+            // not be able to call close(), leading to a resource leak 
+            for (ChannelManager channel : channels) {
+                channel.close();
+            }
+            throw bindError;
         }
-        listenOn(bindAddress, bookieAddress);
     }
 
     boolean isRunning() {
@@ -131,44 +119,21 @@ class BookieNettyServer {
         }
     }
 
-    private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) {
-        ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
-        bootstrap.setPipelineFactory(new BookiePipelineFactory());
-        bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
-        bootstrap.setOption("child.soLinger", 2);
-
-        Channel listen = bootstrap.bind(address);
-        allChannels.add(listen);
-
-        if (conf.isEnableLocalTransport()) {
-            ServerBootstrap jvmbootstrap = new ServerBootstrap(jvmServerChannelFactory);
-            jvmbootstrap.setPipelineFactory(new BookiePipelineFactory());
-
-            // use the same address 'name', so clients can find local Bookie still discovering
them using ZK
-            Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
-            allChannels.add(jvmlisten);
-            LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
-        }
-    }
-
     void start() {
         isRunning.set(true);
     }
 
     void shutdown() {
         LOG.info("Shutting down BookieNettyServer");
-        if (conf.isEnableLocalTransport()) {
-            LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
-        }
         isRunning.set(false);
         allChannels.close().awaitUninterruptibly();
-        serverChannelFactory.releaseExternalResources();
-        if (conf.isEnableLocalTransport()) {
-            jvmServerChannelFactory.releaseExternalResources();
+        for (ChannelManager channel : channels) {
+            channel.close();
         }
     }
 
-    private class BookiePipelineFactory implements ChannelPipelineFactory {
+    class BookiePipelineFactory implements ChannelPipelineFactory {
+
         public ChannelPipeline getPipeline() throws Exception {
             synchronized (suspensionLock) {
                 while (suspended) {
@@ -177,16 +142,16 @@ class BookieNettyServer {
             }
             ChannelPipeline pipeline = Channels.pipeline();
             pipeline.addLast("lengthbaseddecoder",
-                             new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+                    new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
             pipeline.addLast("bookieProtoDecoder", requestDecoder);
             pipeline.addLast("bookieProtoEncoder", responseEncoder);
             pipeline.addLast("bookieAuthHandler",
-                             new AuthHandler.ServerSideHandler(authProviderFactory));
+                    new AuthHandler.ServerSideHandler(authProviderFactory));
 
-            SimpleChannelHandler requestHandler = isRunning.get() ?
-                    new BookieRequestHandler(conf, requestProcessor, allChannels)
+            SimpleChannelHandler requestHandler = isRunning.get()
+                    ? new BookieRequestHandler(conf, requestProcessor, allChannels)
                     : new RejectRequestHandler();
 
             pipeline.addLast("bookieRequestHandler", requestHandler);
@@ -204,6 +169,7 @@ class BookieNettyServer {
     }
 
     private static class CleanupChannelGroup extends DefaultChannelGroup {
+
         private AtomicBoolean closed = new AtomicBoolean(false);
 
         CleanupChannelGroup() {
@@ -230,9 +196,9 @@ class BookieNettyServer {
             if (!(o instanceof CleanupChannelGroup)) {
                 return false;
             }
-            CleanupChannelGroup other = (CleanupChannelGroup)o;
+            CleanupChannelGroup other = (CleanupChannelGroup) o;
             return other.closed.get() == closed.get()
-                && super.equals(other);
+                    && super.equals(other);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
new file mode 100644
index 0000000..15f00db
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Manages the lifycycle of a communication Channel
+ * @author enrico.olivelli
+ */
+public abstract class ChannelManager {
+
+    /**
+     * Boots the Channel
+     * @param conf Bookie Configuration
+     * @param channelPipelineFactory Netty Pipeline Factory
+     * @param bookieAddress The actual address to listen on
+     * @return the channel which is listening for incoming connections
+     * @throws IOException 
+     */
+    public abstract Channel start(ServerConfiguration conf, ChannelPipelineFactory channelPipelineFactory)
throws IOException;
+
+    /**
+     * Releases all resources
+     */
+    public abstract void close();
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
index f123aa6..0dd2aa3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LocalBookiesRegistry.java
@@ -35,7 +35,9 @@ class LocalBookiesRegistry {
         localBookiesRegistry.put(address,Boolean.TRUE);
     }
     static void unregisterLocalBookieAddress(BookieSocketAddress address) {
-        localBookiesRegistry.remove(address);
+        if (address!= null) {
+            localBookiesRegistry.remove(address);
+        }
     }
     static boolean isLocalBookie(BookieSocketAddress address) {        
         return localBookiesRegistry.containsKey(address);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
new file mode 100644
index 0000000..925d677
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Manages a NioServerSocketChannel channel
+ *
+ * @author enrico.olivelli
+ */
+public class NioServerSocketChannelManager extends ChannelManager {
+
+    private ChannelFactory channelFactory;
+
+    @Override
+    public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory)
throws IOException {
+        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+        String base = "bookie-" + conf.getBookiePort() + "-netty";
+        this.channelFactory = new NioServerSocketChannelFactory(
+                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()),
+                Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build()));
+
+        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+        bootstrap.setPipelineFactory(bookiePipelineFactory);
+        bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay());
+        bootstrap.setOption("child.soLinger", 2);
+
+        InetSocketAddress bindAddress;
+        if (conf.getListeningInterface() == null) {
+            // listen on all interfaces
+            bindAddress = new InetSocketAddress(conf.getBookiePort());
+        } else {
+            bindAddress = bookieAddress.getSocketAddress();
+        }
+        
+        Channel listen = bootstrap.bind(bindAddress);
+        return listen;
+    }
+
+    @Override
+    public void close() {
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+        }
+        channelFactory = null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 62f55ea..21b16fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -588,7 +588,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements
Chan
             public void safeRun() {
                 String bAddress = "null";
                 Channel c = channel;
-                if (c != null) {
+                if (c != null && c.getRemoteAddress() != null) {
                     bAddress = c.getRemoteAddress().toString();
                 }
 
@@ -620,7 +620,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements
Chan
             public void safeRun() {
                 String bAddress = "null";
                 Channel c = channel;
-                if(c != null) {
+                if(c != null && c.getRemoteAddress() != null) {
                     bAddress = c.getRemoteAddress().toString();
                 }
                 LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie:
{} rc: {}",

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
new file mode 100644
index 0000000..03881b5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
+
+/**
+ * Manages VM-local channels
+ *
+ * @author enrico.olivelli
+ */
+public class VMLocalChannelManager extends ChannelManager {
+
+    private ChannelFactory channelFactory;
+    private BookieSocketAddress bookieAddress;
+
+    @Override
+    public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory)
throws IOException {
+        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+        this.channelFactory = new DefaultLocalServerChannelFactory();
+        this.bookieAddress = bookieAddress;
+        ServerBootstrap jvmbootstrap = new ServerBootstrap(channelFactory);
+        jvmbootstrap.setPipelineFactory(bookiePipelineFactory);
+
+        // use the same address 'name', so clients can find local Bookie still discovering
them using ZK
+        Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
+        LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
+        return jvmlisten;
+    }
+
+    @Override
+    public void close() {
+        LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+        }
+        channelFactory = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9db51b8d/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
new file mode 100644
index 0000000..5a1f7fc
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
@@ -0,0 +1,79 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.bookkeeper.proto;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BookKeeper;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests of the main BookKeeper client using networkless comunication
+ */
+public class NetworkLessBookieTest extends BaseTestCase {
+    
+    protected ServerConfiguration newServerConfiguration() throws Exception {       
+        return super
+                .newServerConfiguration()
+                .setDisableServerSocketBind(true)
+                .setEnableLocalTransport(true);
+    }
+        
+    DigestType digestType;
+    
+    public NetworkLessBookieTest(DigestType digestType) {
+        super(4);            
+        this.digestType=digestType;
+    }
+
+    @Test
+    public void testUseLocalBookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(20000);
+
+        CountDownLatch l = new CountDownLatch(1);
+        zkUtil.sleepServer(5, l);
+        l.await();
+                
+        try (BookKeeper bkc = new BookKeeper(conf);) {
+            try (LedgerHandle h = bkc.createLedger(1,1,digestType, "testPasswd".getBytes());)
{
+                h.addEntry("test".getBytes());
+            }
+        }
+
+        for (BookieServer bk : bs) {
+            for (ChannelManager channel : bk.nettyServer.channels) {
+                if (! (channel instanceof VMLocalChannelManager)) {
+                    Assert.fail();
+                }
+            }
+        }
+    }
+
+}


Mime
View raw message