cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [1/2] cassandra git commit: Support for both encrypted and unencrypted native transport connections
Date Sat, 05 Sep 2015 16:28:47 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 3c1cb800f -> 5c9792c9a


Support for both encrypted and unencrypted native transport connections

patch by Stefan Podkowinski; reviewed by Robert Stupp for CASSANDRA-9590


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7895cfb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7895cfb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7895cfb

Branch: refs/heads/trunk
Commit: a7895cfb2cdc667041402648bf922265cb0d34c3
Parents: 1752891
Author: Stefan Podkowinski <jira@midnightdrift.com>
Authored: Sat Sep 5 18:23:10 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Sat Sep 5 18:23:10 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 conf/cassandra.yaml                             |   8 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  25 +++
 .../cassandra/service/CassandraDaemon.java      |  38 +++-
 .../service/NativeTransportService.java         | 199 +++++++++++++++++++
 .../cassandra/service/StorageService.java       |  11 +-
 .../org/apache/cassandra/transport/Server.java  | 170 +++++++++-------
 .../org/apache/cassandra/cql3/CQLTester.java    |   2 +-
 .../service/NativeTransportServiceTest.java     | 193 ++++++++++++++++++
 10 files changed, 565 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d4e6771..afd45e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.0-rc1
+ * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
+
+
 3.0.0-beta2
  * Fix columns returned by AbstractBtreePartitions (CASSANDRA-10220)
  * Fix backward compatibility issue due to AbstractBounds serialization bug (CASSANDRA-9857)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 0f8b829..28caa1e 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -489,6 +489,14 @@ start_native_transport: true
 # port for the CQL native transport to listen for clients on
 # For security reasons, you should not expose this port to the internet.  Firewall it if
needed.
 native_transport_port: 9042
+# Enabling native transport encryption in client_encryption_options allows you to either
use
+# encryption for the standard port or to use a dedicated, additional port along with the
unencrypted
+# standard native_transport_port.
+# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption
+# for native_transport_port. Setting native_transport_port_ssl to a different value
+# from native_transport_port will use encryption for native_transport_port_ssl while
+# keeping native_transport_port unencrypted.
+# native_transport_port_ssl: 9142
 # The maximum threads for handling requests when the native transport is used.
 # This is similar to rpc_max_threads though the default differs slightly (and
 # there is no native_transport_min_threads, idle threads will always be stopped

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 22b09d3..164dab2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -131,6 +131,7 @@ public class Config
 
     public Boolean start_native_transport = false;
     public Integer native_transport_port = 9042;
+    public Integer native_transport_port_ssl = null;
     public Integer native_transport_max_threads = 128;
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2e68418..99cd563 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -680,6 +680,14 @@ public class DatabaseDescriptor
             conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
         else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
             throw new ConfigurationException("commitlog_segment_size_in_mb must be at least
twice the size of max_mutation_size_in_kb / 1024", false);
+
+        // native transport encryption options
+        if (conf.native_transport_port_ssl != null
+            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+            && !conf.client_encryption_options.enabled)
+        {
+            throw new ConfigurationException("Encryption must be enabled in client_encryption_options
for native_transport_port_ssl", false);
+        }
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1341,6 +1349,23 @@ public class DatabaseDescriptor
         return Integer.parseInt(System.getProperty("cassandra.native_transport_port", conf.native_transport_port.toString()));
     }
 
+    @VisibleForTesting
+    public static void setNativeTransportPort(int port)
+    {
+        conf.native_transport_port = port;
+    }
+
+    public static int getNativeTransportPortSSL()
+    {
+        return conf.native_transport_port_ssl == null ? getNativeTransportPort() : conf.native_transport_port_ssl;
+    }
+
+    @VisibleForTesting
+    public static void setNativeTransportPortSSL(Integer port)
+    {
+        conf.native_transport_port_ssl = port;
+    }
+
     public static Integer getNativeTransportMaxThreads()
     {
         return conf.native_transport_max_threads;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index c8b9677..230b46a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -127,7 +127,7 @@ public class CassandraDaemon
     private static final CassandraDaemon instance = new CassandraDaemon();
 
     public Server thriftServer;
-    public Server nativeServer;
+    private NativeTransportService nativeTransportService;
 
     private final boolean runManaged;
     protected final StartupChecks startupChecks;
@@ -365,9 +365,7 @@ public class CassandraDaemon
         thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 
         // Native transport
-        InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
-        int nativePort = DatabaseDescriptor.getNativeTransportPort();
-        nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+        nativeTransportService = new NativeTransportService();
 
         completeSetup();
     }
@@ -431,7 +429,8 @@ public class CassandraDaemon
         String nativeFlag = System.getProperty("cassandra.start_native_transport");
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag
== null && DatabaseDescriptor.startNativeTransport()))
         {
-            nativeServer.start();
+            startNativeTransport();
+            StorageService.instance.setRpcReady(true);
         }
         else
             logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport())
or nodetool (enablebinary) to start it");
@@ -453,9 +452,12 @@ public class CassandraDaemon
         // On linux, this doesn't entirely shut down Cassandra, just the RPC server.
         // jsvc takes care of taking the rest down
         logger.info("Cassandra shutting down...");
-        thriftServer.stop();
-        nativeServer.stop();
-
+        if (thriftServer != null)
+            thriftServer.stop();
+        if (nativeTransportService != null)
+            nativeTransportService.destroy();
+        StorageService.instance.setRpcReady(false);
+        
         // On windows, we need to stop the entire system as prunsrv doesn't have the jsvc
hooks
         // We rely on the shutdown hook to drain the node
         if (FBUtilities.isWindows())
@@ -556,6 +558,26 @@ public class CassandraDaemon
         }
     }
 
+    public void startNativeTransport()
+    {
+        if (nativeTransportService == null)
+            throw new IllegalStateException("setup() must be called first for CassandraDaemon");
+        else
+            nativeTransportService.start();
+    }
+
+    public void stopNativeTransport()
+    {
+        if (nativeTransportService != null)
+            nativeTransportService.stop();
+    }
+
+    public boolean isNativeTransportRunning()
+    {
+        return nativeTransportService != null ? nativeTransportService.isRunning() : false;
+    }
+
+
     /**
      * A convenience method to stop and destroy the daemon in one shot.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/NativeTransportService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
new file mode 100644
index 0000000..eff3a89
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+import org.apache.cassandra.transport.Server;
+
+/**
+ * Handles native transport server lifecycle and associated resources. Lazily initialized.
+ */
+public class NativeTransportService
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(NativeTransportService.class);
+
+    private Collection<Server> servers = Collections.emptyList();
+
+    private boolean initialized = false;
+    private EventLoopGroup workerGroup;
+    private EventExecutor eventExecutorGroup;
+
+    /**
+     * Creates netty thread pools and event loops.
+     */
+    @VisibleForTesting
+    synchronized void initialize()
+    {
+        if (initialized)
+            return;
+
+        // prepare netty resources
+        eventExecutorGroup = new RequestThreadPoolExecutor();
+
+        if (useEpoll())
+        {
+            workerGroup = new EpollEventLoopGroup();
+            logger.info("Netty using native Epoll event loop");
+        }
+        else
+        {
+            workerGroup = new NioEventLoopGroup();
+            logger.info("Netty using Java NIO event loop");
+        }
+
+        int nativePort = DatabaseDescriptor.getNativeTransportPort();
+        int nativePortSSL = DatabaseDescriptor.getNativeTransportPortSSL();
+        InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
+
+        org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder()
+                                                                .withEventExecutor(eventExecutorGroup)
+                                                                .withEventLoopGroup(workerGroup)
+                                                                .withHost(nativeAddr);
+
+        if (!DatabaseDescriptor.getClientEncryptionOptions().enabled)
+        {
+            servers = Collections.singleton(builder.withSSL(false).withPort(nativePort).build());
+        }
+        else
+        {
+            if (nativePort != nativePortSSL)
+            {
+                // user asked for dedicated ssl port for supporting both non-ssl and ssl
connections
+                servers = Collections.unmodifiableList(
+                                                      Arrays.asList(
+                                                                   builder.withSSL(false).withPort(nativePort).build(),
+                                                                   builder.withSSL(true).withPort(nativePortSSL).build()
+                                                      )
+                );
+            }
+            else
+            {
+                // ssl only mode using configured native port
+                servers = Collections.singleton(builder.withSSL(true).withPort(nativePort).build());
+            }
+        }
+
+        // register metrics
+        ClientMetrics.instance.addCounter("connectedNativeClients", () ->
+        {
+            int ret = 0;
+            for (Server server : servers)
+                ret += server.getConnectedClients();
+            return ret;
+        });
+
+        initialized = true;
+    }
+
+    /**
+     * Starts native transport servers.
+     */
+    public void start()
+    {
+        initialize();
+        servers.forEach(Server::start);
+    }
+
+    /**
+     * Stops currently running native transport servers.
+     */
+    public void stop()
+    {
+        servers.forEach(Server::stop);
+    }
+
+    /**
+     * Ultimately stops servers and closes all resources.
+     */
+    public void destroy()
+    {
+        stop();
+        servers = Collections.emptyList();
+
+        // shutdown executors used by netty for native transport server
+        Future<?> wgStop = workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
+
+        try
+        {
+            wgStop.await(5000);
+        }
+        catch (InterruptedException e1)
+        {
+            Thread.currentThread().interrupt();
+        }
+
+        // shutdownGracefully not implemented yet in RequestThreadPoolExecutor
+        eventExecutorGroup.shutdown();
+    }
+
+    /**
+     * @return intend to use epoll bassed event looping
+     */
+    public static boolean useEpoll()
+    {
+        final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled",
"true"));
+        return enableEpoll && Epoll.isAvailable();
+    }
+
+    /**
+     * @return true in case native transport server is running
+     */
+    public boolean isRunning()
+    {
+        for (Server server : servers)
+            if (server.isRunning()) return true;
+        return false;
+    }
+
+    @VisibleForTesting
+    EventLoopGroup getWorkerGroup()
+    {
+        return workerGroup;
+    }
+
+    @VisibleForTesting
+    EventExecutor getEventExecutor()
+    {
+        return eventExecutorGroup;
+    }
+
+    @VisibleForTesting
+    Collection<Server> getServers()
+    {
+        return servers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a7ffc04..2d9bbec 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -405,10 +405,10 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-
+        
         try
         {
-            daemon.nativeServer.start();
+            daemon.startNativeTransport();
         }
         catch (Exception e)
         {
@@ -422,17 +422,16 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        if (daemon.nativeServer != null)
-            daemon.nativeServer.stop();
+        daemon.stopNativeTransport();
     }
 
     public boolean isNativeTransportRunning()
     {
-        if ((daemon == null) || (daemon.nativeServer == null))
+        if (daemon == null)
         {
             return false;
         }
-        return daemon.nativeServer.isRunning();
+        return daemon.isNativeTransportRunning();
     }
 
     public void stopTransports()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 72a1b60..cafc0ce 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,9 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
-import java.util.Map;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.*;
-import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
@@ -51,7 +49,6 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.transport.messages.EventMessage;
@@ -64,7 +61,7 @@ public class Server implements CassandraDaemon.Server
     }
 
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
-    private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled",
"true"));
+    private static final boolean useEpoll = NativeTransportService.useEpoll();
 
     public static final int VERSION_1 = 1;
     public static final int VERSION_2 = 2;
@@ -83,41 +80,32 @@ public class Server implements CassandraDaemon.Server
     };
 
     public final InetSocketAddress socket;
+    public boolean useSSL = false;
     private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
     private EventLoopGroup workerGroup;
     private EventExecutor eventExecutorGroup;
 
-    public Server(InetSocketAddress socket)
+    private Server (Builder builder)
     {
-        this.socket = socket;
+        this.socket = builder.getSocket();
+        this.useSSL = builder.useSSL;
+        if (builder.workerGroup != null)
+        {
+            workerGroup = builder.workerGroup;
+        }
+        else
+        {
+            if (useEpoll)
+                workerGroup = new EpollEventLoopGroup();
+            else
+                workerGroup = new NioEventLoopGroup();
+        }
+        if (builder.eventExecutorGroup != null)
+            eventExecutorGroup = builder.eventExecutorGroup;
         EventNotifier notifier = new EventNotifier(this);
         StorageService.instance.register(notifier);
         MigrationManager.instance.register(notifier);
-        registerMetrics();
-    }
-
-    public Server(String hostname, int port)
-    {
-        this(new InetSocketAddress(hostname, port));
-    }
-
-    public Server(InetAddress host, int port)
-    {
-        this(new InetSocketAddress(host, port));
-    }
-
-    public Server(int port)
-    {
-        this(new InetSocketAddress(port));
-    }
-
-    public void start()
-    {
-	    if(!isRunning())
-	    {
-            run();
-	    }
     }
 
     public void stop()
@@ -131,35 +119,25 @@ public class Server implements CassandraDaemon.Server
         return isRunning.get();
     }
 
-    private void run()
+    public synchronized void start()
     {
-        // Configure the server.
-        eventExecutorGroup = new RequestThreadPoolExecutor();
-
-        boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
-        if (hasEpoll)
-        {
-            workerGroup = new EpollEventLoopGroup();
-            logger.info("Netty using native Epoll event loop");
-        }
-        else
-        {
-            workerGroup = new NioEventLoopGroup();
-            logger.info("Netty using Java NIO event loop");
-        }
+        if(isRunning()) 
+            return;
 
+        // Configure the server.
         ServerBootstrap bootstrap = new ServerBootstrap()
-                                    .group(workerGroup)
-                                    .channel(hasEpoll ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
+                                    .channel(useEpoll ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
                                     .childOption(ChannelOption.TCP_NODELAY, true)
                                     .childOption(ChannelOption.SO_LINGER, 0)
                                     .childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive())
                                     .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
                                     .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,
32 * 1024)
                                     .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK,
8 * 1024);
+        if (workerGroup != null)
+            bootstrap = bootstrap.group(workerGroup);
 
         final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
-        if (clientEnc.enabled)
+        if (this.useSSL)
         {
             logger.info("Enabling encrypted CQL connections between client and server");
             bootstrap.childHandler(new SecureInitializer(this, clientEnc));
@@ -171,7 +149,7 @@ public class Server implements CassandraDaemon.Server
 
         // Bind and start to accept incoming connections.
         logger.info("Using Netty Version: {}", Version.identify().entrySet());
-        logger.info("Starting listening for CQL clients on {}...", socket);
+        logger.info("Starting listening for CQL clients on {} ({})...", socket, this.useSSL
? "encrypted" : "unencrypted");
 
         ChannelFuture bindFuture = bootstrap.bind(socket);
         if (!bindFuture.awaitUninterruptibly().isSuccess())
@@ -179,36 +157,83 @@ public class Server implements CassandraDaemon.Server
 
         connectionTracker.allChannels.add(bindFuture.channel());
         isRunning.set(true);
-
-        StorageService.instance.setRpcReady(true);
     }
 
-    private void registerMetrics()
+    public int getConnectedClients()
     {
-        ClientMetrics.instance.addCounter("connectedNativeClients", new Callable<Integer>()
-        {
-            @Override
-            public Integer call() throws Exception
-            {
-                return connectionTracker.getConnectedClients();
-            }
-        });
+        return connectionTracker.getConnectedClients();
     }
-
+    
     private void close()
     {
         // Close opened connections
         connectionTracker.closeAll();
-        workerGroup.shutdownGracefully();
-        workerGroup = null;
-
-        eventExecutorGroup.shutdown();
-        eventExecutorGroup = null;
+        
         logger.info("Stop listening for CQL clients");
-
-        StorageService.instance.setRpcReady(false);
     }
 
+    public static class Builder
+    {
+        private EventLoopGroup workerGroup;
+        private EventExecutor eventExecutorGroup;
+        private boolean useSSL = false;
+        private InetAddress hostAddr;
+        private int port = -1;
+        private InetSocketAddress socket;
+
+        public Builder withSSL(boolean useSSL)
+        {
+            this.useSSL = useSSL;
+            return this;
+        }
+
+        public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup)
+        {
+            this.workerGroup = eventLoopGroup;
+            return this;
+        }
+
+        public Builder withEventExecutor(EventExecutor eventExecutor)
+        {
+            this.eventExecutorGroup = eventExecutor;
+            return this;
+        }
+
+        public Builder withHost(InetAddress host)
+        {
+            this.hostAddr = host;
+            this.socket = null;
+            return this;
+        }
+
+        public Builder withPort(int port)
+        {
+            this.port = port;
+            this.socket = null;
+            return this;
+        }
+
+        public Server build()
+        {
+            return new Server(this);
+        }
+
+        private InetSocketAddress getSocket()
+        {
+            if (this.socket != null)
+                return this.socket;
+            else
+            {
+                if (this.port == -1)
+                    throw new IllegalStateException("Missing port number");
+                if (this.hostAddr != null)
+                    this.socket = new InetSocketAddress(this.hostAddr, this.port);
+                else
+                    throw new IllegalStateException("Missing host");
+                return this.socket;
+            }
+        }
+    }
 
     public static class ConnectionTracker implements Connection.Tracker
     {
@@ -253,7 +278,7 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class Initializer extends ChannelInitializer
+    private static class Initializer extends ChannelInitializer<Channel>
     {
         // Stateless handlers
         private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
@@ -294,7 +319,10 @@ public class Server implements CassandraDaemon.Server
             pipeline.addLast("messageDecoder", messageDecoder);
             pipeline.addLast("messageEncoder", messageEncoder);
 
-            pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
+            if (server.eventExecutorGroup != null)
+                pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
+            else
+                pipeline.addLast("executor", dispatcher);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 349975d..3d3729a 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -316,7 +316,7 @@ public abstract class CQLTester
         StorageService.instance.initServer();
         SchemaLoader.startGossiper();
 
-        server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+        server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
         server.start();
 
         for (int version = 1; version <= maxProtocolVersion; version++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
new file mode 100644
index 0000000..7eb664f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NativeTransportServiceTest
+{
+
+    @After
+    public void resetConfig()
+    {
+        DatabaseDescriptor.getClientEncryptionOptions().enabled = false;
+        DatabaseDescriptor.setNativeTransportPortSSL(null);
+    }
+
+    @Test
+    public void testServiceCanBeStopped()
+    {
+        withService((NativeTransportService service) -> {
+            service.stop();
+            assertFalse(service.isRunning());
+        });
+    }
+
+    @Test
+    public void testIgnoresStartOnAlreadyStarted()
+    {
+        withService((NativeTransportService service) -> {
+            service.start();
+            service.start();
+            service.start();
+        });
+    }
+
+    @Test
+    public void testIgnoresStoppedOnAlreadyStopped()
+    {
+        withService((NativeTransportService service) -> {
+            service.stop();
+            service.stop();
+            service.stop();
+        });
+    }
+
+    @Test
+    public void testDestroy()
+    {
+        withService((NativeTransportService service) -> {
+            Supplier<Boolean> allTerminated = () ->
+                                              service.getWorkerGroup().isShutdown() &&
service.getWorkerGroup().isTerminated() &&
+                                              service.getEventExecutor().isShutdown() &&
service.getEventExecutor().isTerminated();
+            assertFalse(allTerminated.get());
+            service.destroy();
+            assertTrue(allTerminated.get());
+        });
+    }
+
+    @Test
+    public void testConcurrentStarts()
+    {
+        withService(NativeTransportService::start, false, 20);
+    }
+
+    @Test
+    public void testConcurrentStops()
+    {
+        withService(NativeTransportService::stop, true, 20);
+    }
+
+    @Test
+    public void testConcurrentDestroys()
+    {
+        withService(NativeTransportService::destroy, true, 20);
+    }
+
+    @Test
+    public void testPlainDefaultPort()
+    {
+        // default plain settings: client encryption disabled and default native transport
port 
+        withService((NativeTransportService service) ->
+                    {
+                        assertEquals(1, service.getServers().size());
+                        Server server = service.getServers().iterator().next();
+                        assertFalse(server.useSSL);
+                        assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort());
+                    });
+    }
+
+    @Test
+    public void testSSLOnly()
+    {
+        // default ssl settings: client encryption enabled and default native transport port
used for ssl only
+        DatabaseDescriptor.getClientEncryptionOptions().enabled = true;
+
+        withService((NativeTransportService service) ->
+                    {
+                        service.initialize();
+                        assertEquals(1, service.getServers().size());
+                        Server server = service.getServers().iterator().next();
+                        assertTrue(server.useSSL);
+                        assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort());
+                    }, false, 1);
+    }
+
+    @Test
+    public void testSSLWithNonSSL()
+    {
+        // ssl+non-ssl settings: client encryption enabled and additional ssl port specified
+        DatabaseDescriptor.getClientEncryptionOptions().enabled = true;
+        DatabaseDescriptor.setNativeTransportPortSSL(8432);
+
+        withService((NativeTransportService service) ->
+                    {
+                        service.initialize();
+                        assertEquals(2, service.getServers().size());
+                        assertEquals(
+                                    Sets.newHashSet(Arrays.asList(
+                                                                 Pair.create(true, DatabaseDescriptor.getNativeTransportPortSSL()),
+                                                                 Pair.create(false, DatabaseDescriptor.getNativeTransportPort())
+                                                    )
+                                    ),
+                                    service.getServers().stream().map((Server s) ->
+                                                                      Pair.create(s.useSSL,
s.socket.getPort())).collect(Collectors.toSet())
+                        );
+                    }, false, 1);
+    }
+
+    private static void withService(Consumer<NativeTransportService> f)
+    {
+        withService(f, true, 1);
+    }
+
+    private static void withService(Consumer<NativeTransportService> f, boolean start,
int concurrently)
+    {
+        NativeTransportService service = new NativeTransportService();
+        assertFalse(service.isRunning());
+        if (start)
+        {
+            service.start();
+            assertTrue(service.isRunning());
+        }
+        try
+        {
+            if (concurrently == 1)
+            {
+                f.accept(service);
+            }
+            else
+            {
+                IntStream.range(0, concurrently).parallel().map((int i) -> {
+                    f.accept(service);
+                    return 1;
+                }).sum();
+            }
+        }
+        finally
+        {
+            service.stop();
+        }
+    }
+}


Mime
View raw message