camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject camel git commit: CAMEL-10705 - Allow to use an SSLContextParameters object for Kafka
Date Fri, 13 Jan 2017 13:43:20 GMT
Repository: camel
Updated Branches:
  refs/heads/master 9584f3851 -> 39742f911


CAMEL-10705 - Allow to use an SSLContextParameters object for Kafka


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39742f91
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39742f91
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39742f91

Branch: refs/heads/master
Commit: 39742f911c7cf7aeec71f880712ccd808e51ccbe
Parents: 9584f38
Author: Antoine DESSAIGNE <antoine.dessaigne@gmail.com>
Authored: Fri Jan 13 14:35:42 2017 +0100
Committer: Antoine DESSAIGNE <antoine.dessaigne@gmail.com>
Committed: Fri Jan 13 14:35:42 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 50 ++++++++++++-
 .../component/kafka/KafkaConfiguration.java     | 78 ++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d100a32..8eacb10 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -49,7 +49,7 @@ The Kafka component supports 1 options which are listed below.
 
 
 // endpoint options: START
-The Kafka component supports 78 endpoint options which are listed below:
+The Kafka component supports 79 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -120,6 +120,7 @@ The Kafka component supports 78 endpoint options which are listed below:
 | saslMechanism | security | GSSAPI | String | The Simple Authentication and Security Layer
(SASL) Mechanism used. For the valid values see http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
 | securityProtocol | security | PLAINTEXT | String | Protocol used to communicate with brokers.
Currently only PLAINTEXT and SSL are supported.
 | sslCipherSuites | security |  | String | A list of cipher suites. This is a named combination
of authentication encryption MAC and key exchange algorithm used to negotiate the security
settings for a network connection using TLS or SSL network protocol.By default all the available
cipher suites are supported.
+| sslContextParameters | security |  | SSLContextParameters | SSL configuration using a Camel
SSLContextParameters object. If configured it's applied before the other SSL endpoint parameters.
 | sslEnabledProtocols | security | TLSv1.2,TLSv1.1,TLSv1 | String | The list of protocols
enabled for SSL connections. TLSv1.2 TLSv1.1 and TLSv1 are enabled by default.
 | sslEndpointAlgorithm | security |  | String | The endpoint identification algorithm to
validate server hostname using server certificate.
 | sslKeymanagerAlgorithm | security | SunX509 | String | The algorithm used by key manager
factory for SSL connections. Default value is the key manager factory algorithm configured
for the Java Virtual Machine.
@@ -229,6 +230,53 @@ from("direct:start")
     .to("kafka:localhost:9092?topic=test");
 ----------------------------------------------------------------------------
 
+
+#### SSL configuration
+
+You have 2 different ways to configure the SSL communication on the Kafka` component.
+
+The first way is through the many SSL endpoint parameters
+[source,java]
+-------------------------------------------------------------
+from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +
+             "&groupId=A" +
+             "&sslKeystoreLocation=/path/to/keystore.jks" +
+             "&sslKeystorePassword=changeit" +
+             "&sslKeyPassword=changeit")
+        .to("mock:result");
+-------------------------------------------------------------
+
+The second way is to use the `sslContextParameters` endpoint parameter.
+[source,java]
+--------------------------------------------------------------------------------------------------
+// Configure the SSLContextParameters object
+KeyStoreParameters ksp = new KeyStoreParameters();
+ksp.setResource("/path/to/keystore.jks");
+ksp.setPassword("changeit");
+KeyManagersParameters kmp = new KeyManagersParameters();
+kmp.setKeyStore(ksp);
+kmp.setKeyPassword("changeit");
+SSLContextParameters scp = new SSLContextParameters();
+scp.setKeyManagers(kmp);
+
+// Bind this SSLContextParameters into the Camel registry
+JndiRegistry registry = new JndiRegistry();
+registry.bind("ssl", scp);
+
+// Configure the camel context
+DefaultCamelContext camelContext = new DefaultCamelContext(registry);
+camelContext.addRoutes(new RouteBuilder() {
+    @Override
+    public void configure() throws Exception {
+        from("kafka:localhost:{{kafkaPort}}?topic=" + TOPIC +  //
+                     "&groupId=A" +                            //
+                     "&sslContextParameters=#ssl")             // Reference the SSL configuration
+                .to("mock:result");
+    }
+});
+--------------------------------------------------------------------------------------------------
+
+
 ### Endpoints
 
 Camel supports the link:message-endpoint.html[Message Endpoint] pattern

http://git-wip-us.apache.org/repos/asf/camel/blob/39742f91/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 214fd2f..a10ace4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
@@ -27,6 +28,12 @@ import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.jsse.CipherSuitesParameters;
+import org.apache.camel.util.jsse.KeyManagersParameters;
+import org.apache.camel.util.jsse.KeyStoreParameters;
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.apache.camel.util.jsse.SecureSocketProtocolsParameters;
+import org.apache.camel.util.jsse.TrustManagersParameters;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -180,6 +187,10 @@ public class KafkaConfiguration {
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "50")
     private Integer reconnectBackoffMs = 50;
+
+    // SSL
+    @UriParam(label = "common,security")
+    private SSLContextParameters sslContextParameters;
     // SSL
     // ssl.key.password
     @UriParam(label = "producer,security", secret = true)
@@ -264,6 +275,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
         addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
         // SSL
+        applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
@@ -322,6 +334,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
         // SSL
+        applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
@@ -368,6 +381,54 @@ public class KafkaConfiguration {
         return props;
     }
 
+    /**
+     * Uses the standard camel {@link SSLContextParameters} object to fill the Kafka SSL
properties
+     *
+     * @param props Kafka properties
+     * @param sslContextParameters SSL configuration
+     */
+    private void applySslConfiguration(Properties props, SSLContextParameters sslContextParameters)
{
+        if (sslContextParameters != null) {
+            addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, sslContextParameters.getSecureSocketProtocol());
+            addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, sslContextParameters.getProvider());
+
+            CipherSuitesParameters cipherSuites = sslContextParameters.getCipherSuites();
+            if (cipherSuites != null) {
+                addCommaSeparatedList(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, cipherSuites.getCipherSuite());
+            }
+
+            SecureSocketProtocolsParameters secureSocketProtocols = sslContextParameters.getSecureSocketProtocols();
+            if (secureSocketProtocols != null) {
+                addCommaSeparatedList(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, secureSocketProtocols.getSecureSocketProtocol());
+            }
+
+            KeyManagersParameters keyManagers = sslContextParameters.getKeyManagers();
+            if (keyManagers != null) {
+                addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagers.getAlgorithm());
+                addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyManagers.getKeyPassword());
+
+                KeyStoreParameters keyStore = keyManagers.getKeyStore();
+                if (keyStore != null) {
+                    addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, keyStore.getType());
+                    addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
keyStore.getResource());
+                    addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
keyStore.getPassword());
+                }
+            }
+
+            TrustManagersParameters trustManagers = sslContextParameters.getTrustManagers();
+            if (trustManagers != null) {
+                addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
trustManagers.getAlgorithm());
+
+                KeyStoreParameters keyStore = trustManagers.getKeyStore();
+                if (keyStore != null) {
+                    addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, keyStore.getType());
+                    addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
keyStore.getResource());
+                    addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
keyStore.getPassword());
+                }
+            }
+        }
+    }
+
     private static <T> void addPropertyIfNotNull(Properties props, String key, T value)
{
         if (value != null) {
             // Kafka expects all properties as String
@@ -384,6 +445,12 @@ public class KafkaConfiguration {
         }
     }
 
+    private static void addCommaSeparatedList(Properties props, String key, List<String>
values) {
+        if (values != null && !values.isEmpty()) {
+            props.put(key, values.stream().collect(Collectors.joining(",")));
+        }
+    }
+
     public String getGroupId() {
         return groupId;
     }
@@ -837,6 +904,17 @@ public class KafkaConfiguration {
         this.securityProtocol = securityProtocol;
     }
 
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    /**
+     * SSL configuration using a Camel {@link SSLContextParameters} object. If configured
it's applied before the other SSL endpoint parameters.
+     */
+    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
+
     public String getSslKeyPassword() {
         return sslKeyPassword;
     }


Mime
View raw message