cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: Support client Thrift SSL socket patch by Jason Brown; reviewed by Vijay for CASSANDRA-4239
Date Thu, 18 Oct 2012 01:58:49 GMT
Updated Branches:
  refs/heads/trunk 1d489dc55 -> 8495560c2


Support client Thrift SSL socket
patch by Jason Brown; reviewed by Vijay for CASSANDRA-4239


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8495560c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8495560c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8495560c

Branch: refs/heads/trunk
Commit: 8495560c2ba71621ab4326de7a87e9f3222ab8e4
Parents: 1d489dc
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Wed Oct 17 18:58:04 2012 -0700
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Wed Oct 17 18:58:04 2012 -0700

----------------------------------------------------------------------
 conf/cassandra.yaml                                |   17 ++++-
 src/java/org/apache/cassandra/cli/CliOptions.java  |   47 ++++++++++
 .../org/apache/cassandra/cli/CliSessionState.java  |    2 +
 .../cli/transport/SSLTransportFactory.java         |   47 ++++++++++
 src/java/org/apache/cassandra/config/Config.java   |    5 +-
 .../cassandra/config/DatabaseDescriptor.java       |   16 +++-
 .../org/apache/cassandra/net/MessagingService.java |    4 +-
 .../cassandra/net/OutboundTcpConnectionPool.java   |    6 +-
 .../apache/cassandra/thrift/CustomTHsHaServer.java |    4 +
 .../cassandra/thrift/CustomTNonBlockingServer.java |    5 +
 .../cassandra/thrift/CustomTThreadPoolServer.java  |   19 ++++-
 .../cassandra/thrift/TCustomServerSocket.java      |    9 ++
 test/conf/cassandra.yaml                           |    2 +-
 .../cli/transport/SSLTransportFactory.java         |   47 ++++++++++
 .../src/org/apache/cassandra/stress/Session.java   |   69 +++++++++++++-
 15 files changed, 283 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f312b36..0a261c8 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -605,7 +605,7 @@ index_interval: 128
 # the keystore and truststore.  For instructions on generating these files, see:
 # http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
 #
-encryption_options:
+server_encryption_options:
     internode_encryption: none
     keystore: conf/.keystore
     keystore_password: cassandra
@@ -617,6 +617,21 @@ encryption_options:
     # store_type: JKS
     # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
 
+# enable or disable client/server encryption.
+# The available internode options are: none, all
+client_encryption_options:
+    internode_encryption: none
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    truststore: conf/.truststore
+    truststore_password: cassandra
+    # More advanced defaults below:
+    # protocol: TLS
+    # algorithm: SunX509
+    # store_type: JKS
+    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+
+
 # internode_compression controls whether traffic between nodes is
 # compressed.
 # can be:  all  - all traffic is compressed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java
index b905336..b684cc4 100644
--- a/src/java/org/apache/cassandra/cli/CliOptions.java
+++ b/src/java/org/apache/cassandra/cli/CliOptions.java
@@ -47,6 +47,13 @@ public class CliOptions
     private static final String VERBOSE_OPTION  = "verbose";
     private static final String SCHEMA_MIGRATION_WAIT_TIME = "schema-mwt";
 
+    private static final String SSL_TRUSTSTORE = "truststore";
+    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
+    private static final String SSL_PROTOCOL = "ssl-protocol";
+    private static final String SSL_ALGORITHM = "ssl-alg";
+    private static final String SSL_STORE_TYPE = "store-type";
+    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+
     // Default values for optional command line arguments
     private static final String DEFAULT_HOST        = "127.0.0.1";
     private static final int    DEFAULT_THRIFT_PORT = 9160;
@@ -67,6 +74,14 @@ public class CliOptions
         options.addOption(null, SCHEMA_MIGRATION_WAIT_TIME,  "TIME", "Schema migration wait
time (secs.), default is 10 secs");
         options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified
TTransportFactory class name for creating a connection to cassandra");
 
+        // ssl connection-related options
+        options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
+        options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: full path
to truststore");
+        options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to
use (default: TLS)");
+        options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)");
+        options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store");
+        options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated
list of encryption suites to use");
+
         // options without argument
         options.addOption("B",  BATCH_OPTION,   "enabled batch mode (suppress output; errors
are fatal)");
         options.addOption(null, DEBUG_OPTION,   "display stack-traces (NOTE: We print strack-traces
in the places where it makes sense even without --debug)");
@@ -97,7 +112,9 @@ public class CliOptions
             }
 
             if (cmd.hasOption(TRANSPORT_FACTORY))
+            {
                 css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
+            }
 
             if (cmd.hasOption(DEBUG_OPTION))
             {
@@ -170,6 +187,36 @@ public class CliOptions
                 css.schema_mwt = Integer.parseInt(cmd.getOptionValue(SCHEMA_MIGRATION_WAIT_TIME))
* 1000;
             }
 
+            if(cmd.hasOption(SSL_TRUSTSTORE))
+            {
+                css.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
+            }
+
+            if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+            {
+                css.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
+            }
+
+            if(cmd.hasOption(SSL_PROTOCOL))
+            {
+                css.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
+            }
+
+            if(cmd.hasOption(SSL_ALGORITHM))
+            {
+                css.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
+            }
+
+            if(cmd.hasOption(SSL_STORE_TYPE))
+            {
+                css.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
+            }
+
+            if(cmd.hasOption(SSL_CIPHER_SUITES))
+            {
+                css.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
+            }
+
             // Abort if there are any unrecognized arguments left
             if (cmd.getArgs().length > 0)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/cli/CliSessionState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliSessionState.java b/src/java/org/apache/cassandra/cli/CliSessionState.java
index 5f29186..08375bb 100644
--- a/src/java/org/apache/cassandra/cli/CliSessionState.java
+++ b/src/java/org/apache/cassandra/cli/CliSessionState.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.io.PrintStream;
 
 import org.apache.cassandra.cli.transport.FramedTransportFactory;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.thrift.transport.TTransportFactory;
 
@@ -42,6 +43,7 @@ public class CliSessionState
     public boolean verbose = false; // verbose output
     public int     schema_mwt = 10 * 1000;    // Schema migration wait time (secs.)
     public TTransportFactory transportFactory = new FramedTransportFactory();
+    public EncryptionOptions encOptions = new EncryptionOptions();
 
     /*
      * Streams to read/write from

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/cli/transport/SSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/transport/SSLTransportFactory.java b/src/java/org/apache/cassandra/cli/transport/SSLTransportFactory.java
new file mode 100644
index 0000000..4aa9fc1
--- /dev/null
+++ b/src/java/org/apache/cassandra/cli/transport/SSLTransportFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cli.transport;
+
+import org.apache.cassandra.cli.CliMain;
+import org.apache.cassandra.cli.CliSessionState;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+public class SSLTransportFactory extends TTransportFactory
+{
+    private static final int SOCKET_TIMEOUT = 0;
+
+    public TTransport getTransport(TTransport trans)
+    {
+        final CliSessionState sessionState = CliMain.sessionState;
+        try
+        {
+            TSSLTransportParameters params = new TSSLTransportParameters(sessionState.encOptions.protocol,
sessionState.encOptions.cipher_suites);
+            params.setTrustStore(sessionState.encOptions.truststore, sessionState.encOptions.truststore_password);
+            trans = TSSLTransportFactory.getClientSocket(sessionState.hostName, sessionState.thriftPort,
SOCKET_TIMEOUT, params);
+            return new FramedTransportFactory().getTransport(trans);
+        }
+        catch (TTransportException e)
+        {
+            throw new RuntimeException("Failed to create a client SSL connection.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 90746c3..827d8d3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -124,7 +124,10 @@ public class Config
     public RequestSchedulerId request_scheduler_id;
     public RequestSchedulerOptions request_scheduler_options;
 
-    public EncryptionOptions encryption_options = new EncryptionOptions();
+    public EncryptionOptions server_encryption_options = new EncryptionOptions();
+    public EncryptionOptions client_encryption_options = new EncryptionOptions();
+    // this encOptions is for backward compatibility (a warning is logged by DatabaseDescriptor)
+    public EncryptionOptions encryption_options;
 
     public InternodeCompression internode_compression = InternodeCompression.none;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 02a91e8..7d87c23 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -442,6 +442,13 @@ public class DatabaseDescriptor
 
             rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider);
 
+            if(conf.encryption_options != null)
+            {
+                logger.warn("Please rename encryption_options as server_encryption_options
in the yaml");
+                //operate under the assumption that server_encryption_options is not set
in yaml rather than both
+                conf.server_encryption_options = conf.encryption_options;
+            }
+
             // Hardcoded system tables
             List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(),
KSMetaData.traceKeyspace());
             assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
@@ -1080,9 +1087,14 @@ public class DatabaseDescriptor
         conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold;
     }
 
-    public static EncryptionOptions getEncryptionOptions()
+    public static EncryptionOptions getServerEncryptionOptions()
+    {
+        return conf.server_encryption_options;
+    }
+
+    public static EncryptionOptions getClientEncryptionOptions()
     {
-        return conf.encryption_options;
+        return conf.client_encryption_options;
     }
 
     public static double getFlushLargestMemtablesAt()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 63d6e38..06a0270 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -395,11 +395,11 @@ public final class MessagingService implements MessagingServiceMBean
     private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException
     {
         final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
-        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
+        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
         {
             try
             {
-                ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(),
localEp, DatabaseDescriptor.getSSLStoragePort()));
+                ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(),
localEp, DatabaseDescriptor.getSSLStoragePort()));
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 05a39a1..c9cb8d0 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -117,9 +117,9 @@ public class OutboundTcpConnectionPool
         if (isEncryptedChannel())
         {
             if (Config.getOutboundBindAny())
-                return SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endPoint(),
DatabaseDescriptor.getSSLStoragePort());
+                return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(),
endPoint(), DatabaseDescriptor.getSSLStoragePort());
             else
-                return SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endPoint(),
DatabaseDescriptor.getSSLStoragePort(), FBUtilities.getLocalAddress(), 0);
+                return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(),
endPoint(), DatabaseDescriptor.getSSLStoragePort(), FBUtilities.getLocalAddress(), 0);
         }
         else
         {
@@ -137,7 +137,7 @@ public class OutboundTcpConnectionPool
 
     boolean isEncryptedChannel()
     {
-        switch (DatabaseDescriptor.getEncryptionOptions().internode_encryption)
+        switch (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption)
         {
             case none:
                 return false; // if nothing needs to be encrypted then return immediately.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index 4d3aa62..86445b8 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
@@ -352,6 +353,9 @@ public class CustomTHsHaServer extends TNonblockingServer
     {
         public TServer buildTServer(Args args)
         {
+            if(!DatabaseDescriptor.getClientEncryptionOptions().internode_encryption.equals(EncryptionOptions.InternodeEncryption.none))
+                throw new RuntimeException("Client SSL is not supported for non-blocking
sockets (hsha). Please remove client ssl from the configuration.");
+
             final InetSocketAddress addr = args.addr;
             TNonblockingServerTransport serverTransport;
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 62da846..fa5af8d 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetSocketAddress;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
@@ -46,6 +48,9 @@ public class CustomTNonBlockingServer extends TNonblockingServer
     {
         public TServer buildTServer(Args args)
         {
+            if(!DatabaseDescriptor.getClientEncryptionOptions().internode_encryption.equals(EncryptionOptions.InternodeEncryption.none))
+                throw new RuntimeException("Client SSL is not supported for non-blocking
sockets. Please remove client ssl from the configuration.");
+
             final InetSocketAddress addr = args.addr;
             TNonblockingServerTransport serverTransport;
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index 24010b3..04efc97 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -31,15 +31,19 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
 
 
 /**
@@ -240,7 +244,20 @@ public class CustomTThreadPoolServer extends TServer
             TServerTransport serverTransport;
             try
             {
-                serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize,
args.recvBufferSize);
+                final EncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
+                if(EncryptionOptions.InternodeEncryption.all == clientEnc.internode_encryption)
+                {
+                    logger.info("enabling encrypted thrift connections between client and
server");
+                    TSSLTransportParameters params = new TSSLTransportParameters(clientEnc.protocol,
clientEnc.cipher_suites);
+                    params.setKeyStore(clientEnc.keystore, clientEnc.keystore_password);
+                    params.setTrustStore(clientEnc.truststore, clientEnc.truststore_password);
+                    TServerSocket sslServer = TSSLTransportFactory.getServerSocket(addr.getPort(),
0, addr.getAddress(), params);
+                    serverTransport = new TCustomServerSocket(sslServer.getServerSocket(),
args.keepAlive, args.sendBufferSize, args.recvBufferSize);
+                }
+                else
+                {
+                    serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize,
args.recvBufferSize);
+                }
             }
             catch (TTransportException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
index 4107da0..6f1d496 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
@@ -80,6 +80,15 @@ public class TCustomServerSocket extends TServerTransport
         this.recvBufferSize = recvBufferSize;
     }
 
+    public TCustomServerSocket(ServerSocket socket, boolean keepAlive, Integer sendBufferSize,
Integer recvBufferSize)
+            throws TTransportException
+    {
+        this.serverSocket = socket;
+        this.keepAlive = keepAlive;
+        this.sendBufferSize = sendBufferSize;
+        this.recvBufferSize = recvBufferSize;
+    }
+
     @Override
     protected TCustomSocket acceptImpl() throws TTransportException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 1e722be..782b64c 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -25,7 +25,7 @@ endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
 dynamic_snitch: true
 request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
 request_scheduler_id: keyspace
-encryption_options:
+server_encryption_options:
     internode_encryption: none
     keystore: conf/.keystore
     keystore_password: cassandra

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/tools/stress/src/org/apache/cassandra/cli/transport/SSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/cli/transport/SSLTransportFactory.java
b/tools/stress/src/org/apache/cassandra/cli/transport/SSLTransportFactory.java
new file mode 100644
index 0000000..6cc1554
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/cli/transport/SSLTransportFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cli.transport;
+
+import org.apache.cassandra.cli.transport.FramedTransportFactory;
+import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.Stress;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+public class SSLTransportFactory extends TTransportFactory
+{
+    public TTransport getTransport(TTransport trans)
+    {
+        final Session session = Stress.session;
+        try
+        {
+            String hostName = session.nodes[Stress.randomizer.nextInt(session.nodes.length)];
+            TSSLTransportParameters params = new TSSLTransportParameters(session.encOptions.protocol,
session.encOptions.cipher_suites);
+            params.setTrustStore(session.encOptions.truststore, session.encOptions.truststore_password);
+            trans = TSSLTransportFactory.getClientSocket(hostName, session.port, 0, params);
+            return new FramedTransportFactory().getTransport(trans);
+        }
+        catch (TTransportException e)
+        {
+            throw new RuntimeException("Failed to create a client SSL connection.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8495560c/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 057b431..dc585ba 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -25,7 +25,9 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.cassandra.cli.transport.FramedTransportFactory;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.db.marshal.*;
@@ -38,9 +40,9 @@ import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
 
 public class Session implements Serializable
 {
@@ -57,6 +59,13 @@ public class Session implements Serializable
     public final AtomicInteger keys;
     public final AtomicLong    latency;
 
+    private static final String SSL_TRUSTSTORE = "truststore";
+    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
+    private static final String SSL_PROTOCOL = "ssl-protocol";
+    private static final String SSL_ALGORITHM = "ssl-alg";
+    private static final String SSL_STORE_TYPE = "store-type";
+    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+
     static
     {
         availableOptions.addOption("h",  "help",                 false,  "Show this help
message and exit");
@@ -95,6 +104,13 @@ public class Session implements Serializable
         availableOptions.addOption("Q",  "query-names",          true,   "Comma-separated
list of column names to retrieve from each row.");
         availableOptions.addOption("Z",  "compaction-strategy",  true,   "CompactionStrategy
to use.");
         availableOptions.addOption("U",  "comparator",           true,   "Column Comparator
to use. Currently supported types are: TimeUUIDType, AsciiType, UTF8Type.");
+        availableOptions.addOption("tf", "transport-factory",    true,   "Fully qualified
class name for creating a thrift connection");
+        availableOptions.addOption("ts", SSL_TRUSTSTORE,         true, "SSL: full path to
truststore");
+        availableOptions.addOption("tspw", SSL_TRUSTSTORE_PW,    true, "SSL: full path to
truststore");
+        availableOptions.addOption("prtcl", SSL_PROTOCOL,        true, "SSL: connections
protocol to use (default: TLS)");
+        availableOptions.addOption("alg", SSL_ALGORITHM,         true, "SSL: algorithm (default:
SunX509)");
+        availableOptions.addOption("st", SSL_STORE_TYPE,         true, "SSL: type of store");
+        availableOptions.addOption("ciphers", SSL_CIPHER_SUITES, true, "SSL: comma-separated
list of encryption suites to use");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -104,10 +120,10 @@ public class Session implements Serializable
     private int columns          = 5;
     private int columnSize       = 34;
     private int cardinality      = 50;
-    private String[] nodes       = new String[] { "127.0.0.1" };
+    public String[] nodes        = new String[] { "127.0.0.1" };
     private boolean random       = false;
     private int retryTimes       = 10;
-    private int port             = 9160;
+    public int port              = 9160;
     private int superColumns     = 1;
     private String compression   = null;
     private String compactionStrategy = null;
@@ -144,6 +160,8 @@ public class Session implements Serializable
     public final String comparator;
     public final boolean timeUUIDComparator;
     public double traceProbability = 0.0;
+    public EncryptionOptions encOptions = new EncryptionOptions();
+    public TTransportFactory transportFactory = new FramedTransportFactory();
 
     public Session(String[] arguments) throws IllegalArgumentException, SyntaxException
     {
@@ -383,6 +401,28 @@ public class Session implements Serializable
                 comparator = null;
                 timeUUIDComparator = false;
             }
+
+            if(cmd.hasOption(SSL_TRUSTSTORE))
+                encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
+
+            if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+                encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
+
+            if(cmd.hasOption(SSL_PROTOCOL))
+                encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
+
+            if(cmd.hasOption(SSL_ALGORITHM))
+                encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
+
+            if(cmd.hasOption(SSL_STORE_TYPE))
+                encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
+
+            if(cmd.hasOption(SSL_CIPHER_SUITES))
+                encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
+
+            if (cmd.hasOption("tf"))
+                transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf"));
+
         }
         catch (ParseException e)
         {
@@ -401,6 +441,24 @@ public class Session implements Serializable
         latency = new AtomicLong();
     }
 
+    private TTransportFactory validateAndSetTransportFactory(String transportFactory)
+    {
+        try
+        {
+            Class factory = Class.forName(transportFactory);
+
+            if(!TTransportFactory.class.isAssignableFrom(factory))
+                throw new IllegalArgumentException(String.format("transport factory '%s'
" +
+                        "not derived from TTransportFactory", transportFactory));
+
+            return (TTransportFactory) factory.newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new IllegalArgumentException(String.format("Cannot create a transport factory
'%s'.", transportFactory), e);
+        }
+    }
+
     public int getCardinality()
     {
         return cardinality;
@@ -645,12 +703,13 @@ public class Session implements Serializable
         String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
 
         TSocket socket = new TSocket(currentNode, port);
-        TTransport transport = new TFramedTransport(socket);
+        TTransport transport = transportFactory.getTransport(socket);
         CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
 
         try
         {
-            transport.open();
+            if(!transport.isOpen())
+                transport.open();
 
             if (enable_cql)
                 client.set_cql_version(cqlVersion);


Mime
View raw message