cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/3] git commit: add client encryption support to sstableloader patch by Sam Tunnicliffe; reviewed by Mikhail Stepura for CASSANDRA-6378
Date Wed, 18 Dec 2013 22:18:04 GMT
Updated Branches:
  refs/heads/cassandra-2.0 21bb53146 -> 1b2a19037
  refs/heads/trunk 1152e4b39 -> 2e4d709d1


add client encryption support to sstableloader
patch by Sam Tunnicliffe; reviewed by Mikhail Stepura for CASSANDRA-6378


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b2a1903
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b2a1903
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b2a1903

Branch: refs/heads/cassandra-2.0
Commit: 1b2a190379141094a986495bd1386e720786c9b7
Parents: 21bb531
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Wed Dec 18 16:17:13 2013 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed Dec 18 16:17:13 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/tools/BulkLoader.java  | 130 ++++++++++++++++++-
 2 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b876204..d6223be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * add client encryption support to sstableloader (CASSANDRA-6378)
  * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
  * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
  * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index c89bb83..15c8df8 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,7 +24,9 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -58,12 +60,21 @@ public class BulkLoader
     private static final String USER_OPTION = "username";
     private static final String PASSWD_OPTION = "password";
     private static final String THROTTLE_MBITS = "throttle";
+    private static final String TRANSPORT_FACTORY = "transport-factory";
+    private static final String SSL_TRUSTSTORE = "truststore";
+    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
+    private static final String SSL_KEYSTORE = "keystore";
+    private static final String SSL_KEYSTORE_PW = "keystore-password";
+    private static final String SSL_PROTOCOL = "ssl-protocol";
+    private static final String SSL_ALGORITHM = "ssl-alg";
+    private static final String SSL_STORE_TYPE = "store-type";
+    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
 
     public static void main(String args[])
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
         OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts,
options.rpcPort, options.user, options.passwd), handler);
+        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts,
options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         StreamResultFuture future = loader.stream(options.ignores);
         future.addEventListener(new ProgressIndicator());
@@ -175,14 +186,16 @@ public class BulkLoader
         private final int rpcPort;
         private final String user;
         private final String passwd;
+        private final ITransportFactory transportFactory;
 
-        public ExternalClient(Set<InetAddress> hosts, int port, String user, String
passwd)
+        public ExternalClient(Set<InetAddress> hosts, int port, String user, String
passwd, ITransportFactory transportFactory)
         {
             super();
             this.hosts = hosts;
             this.rpcPort = port;
             this.user = user;
             this.passwd = passwd;
+            this.transportFactory = transportFactory;
         }
 
         public void init(String keyspace)
@@ -194,7 +207,7 @@ public class BulkLoader
                 {
                     // Query endpoint to ranges map and schemas from thrift
                     InetAddress host = hostiter.next();
-                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort,
this.user, this.passwd);
+                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort,
this.user, this.passwd, this.transportFactory);
 
                     setPartitioner(client.describe_partitioner());
                     Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
@@ -233,11 +246,9 @@ public class BulkLoader
             return knownCfs.get(cfName);
         }
 
-        private static Cassandra.Client createThriftClient(String host, int port, String
user, String passwd) throws Exception
+        private static Cassandra.Client createThriftClient(String host, int port, String
user, String passwd, ITransportFactory transportFactory) throws Exception
         {
-            TSocket socket = new TSocket(host, port);
-            TTransport trans = new TFramedTransport(socket);
-            trans.open();
+            TTransport trans = transportFactory.openTransport(host, port);
             TProtocol protocol = new TBinaryProtocol(trans);
             Cassandra.Client client = new Cassandra.Client(protocol);
             if (user != null && passwd != null)
@@ -263,6 +274,8 @@ public class BulkLoader
         public String user;
         public String passwd;
         public int throttle = 0;
+        public ITransportFactory transportFactory = new TFramedTransportFactory();
+        public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
 
         public final Set<InetAddress> hosts = new HashSet<InetAddress>();
         public final Set<InetAddress> ignores = new HashSet<InetAddress>();
@@ -367,6 +380,55 @@ public class BulkLoader
                     }
                 }
 
+                if(cmd.hasOption(SSL_TRUSTSTORE))
+                {
+                    opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
+                }
+
+                if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+                {
+                    opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
+                }
+
+                if(cmd.hasOption(SSL_KEYSTORE))
+                {
+                    opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE);
+                    // if a keystore was provided, lets assume we'll need to use it
+                    opts.encOptions.require_client_auth = true;
+                }
+
+                if(cmd.hasOption(SSL_KEYSTORE_PW))
+                {
+                    opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW);
+                }
+
+                if(cmd.hasOption(SSL_PROTOCOL))
+                {
+                    opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
+                }
+
+                if(cmd.hasOption(SSL_ALGORITHM))
+                {
+                    opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
+                }
+
+                if(cmd.hasOption(SSL_STORE_TYPE))
+                {
+                    opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
+                }
+
+                if(cmd.hasOption(SSL_CIPHER_SUITES))
+                {
+                    opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
+                }
+
+                if (cmd.hasOption(TRANSPORT_FACTORY))
+                {
+                    ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
+                    configureTransportFactory(transportFactory, opts);
+                    opts.transportFactory = transportFactory;
+                }
+
                 return opts;
             }
             catch (ParseException e)
@@ -376,6 +438,50 @@ public class BulkLoader
             }
         }
 
+        private static ITransportFactory getTransportFactory(String transportFactory)
+        {
+            try
+            {
+                Class<?> factory = Class.forName(transportFactory);
+                if (!ITransportFactory.class.isAssignableFrom(factory))
+                    throw new IllegalArgumentException(String.format("transport factory '%s'
" +
+                            "not derived from ITransportFactory", transportFactory));
+                return (ITransportFactory) factory.newInstance();
+            }
+            catch (Exception e)
+            {
+                throw new IllegalArgumentException(String.format("Cannot create a transport
factory '%s'.", transportFactory), e);
+            }
+        }
+
+        private static void configureTransportFactory(ITransportFactory transportFactory,
LoaderOptions opts)
+        {
+            Map<String, String> options = new HashMap<>();
+            // If the supplied factory supports the same set of options as our SSL impl,
set those 
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+                options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore);
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+                options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password);
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+                options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol);
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+                options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites));
+
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+                    && opts.encOptions.require_client_auth)
+                options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore);
+            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+                    && opts.encOptions.require_client_auth)
+                options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password);
+
+            // Now check if any of the factory's supported options are set as system properties
+            for (String optionKey : transportFactory.supportedOptions())
+                if (System.getProperty(optionKey) != null)
+                    options.put(optionKey, System.getProperty(optionKey));
+
+            transportFactory.setOptions(options);
+        }
+
         private static void errorMsg(String msg, CmdLineOptions options)
         {
             System.err.println(msg);
@@ -395,6 +501,16 @@ public class BulkLoader
             options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle speed in Mbits
(default unlimited)");
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
+            options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified
ITransportFactory class name for creating a connection to cassandra");
+            // ssl connection-related options
+            options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
+            options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password
of the truststore");
+            options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore");
+            options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password
of the keystore");
+            options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol
to use (default: TLS)");
+            options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default:
SunX509)");
+            options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store");
+            options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated
list of encryption suites to use");
             return options;
         }
 


Mime
View raw message