nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ozhurakou...@apache.org
Subject [2/2] nifi git commit: NIFI-2423: Make use of the SSLContextService to provide SSL information
Date Fri, 05 Aug 2016 18:22:12 GMT
NIFI-2423: Make use of the SSLContextService to provide SSL information

Signed-off-by: Oleg Zhurakousky <oleg@suitcase.io>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc9a46c6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc9a46c6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc9a46c6

Branch: refs/heads/0.x
Commit: dc9a46c637fb76f6b9fc8a841151c6be0c2ac836
Parents: 46a9bba
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Aug 3 14:20:29 2016 -0400
Committer: Oleg Zhurakousky <oleg@suitcase.io>
Committed: Fri Aug 5 14:21:43 2016 -0400

----------------------------------------------------------------------
 .../nifi-kafka-pubsub-nar/pom.xml               |  5 ++
 .../nifi-kafka-pubsub-processors/pom.xml        |  4 ++
 .../kafka/pubsub/AbstractKafkaProcessor.java    | 59 +++++++++++---------
 .../processors/kafka/pubsub/ConsumeKafka.java   |  1 -
 4 files changed, 43 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dc9a46c6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
index 45d6fb5..1254f77 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
@@ -31,5 +31,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kafka-pubsub-processors</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc9a46c6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
index 3893c3a..7b869dc 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
@@ -35,6 +35,10 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
 		    <groupId>org.apache.kafka</groupId>
 		    <artifactId>kafka-clients</artifactId>
 		    <version>0.9.0.1</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc9a46c6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
index 04f9365..c2c2321 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
@@ -43,6 +43,7 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,29 +116,12 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends
AbstractSessi
             .expressionLanguageSupported(true)
             .build();
 
-    static final PropertyDescriptor SSL_KEY_PASSWORD = new PropertyDescriptor.Builder()
-            .name("ssl.key.password")
-            .displayName("SSL Key Password")
-            .description("The password of the private key in the key store file. Corresponds
to Kafka's 'ssl.key.password' property.")
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+        .name("ssl.context.service")
+        .displayName("SSL Context Service")
+        .description("Specifies the SSL Context Service to use for communicating with Kafka.")
             .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .sensitive(true)
-            .build();
-    static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
-            .name("ssl.keystore.password")
-            .displayName("SSK Keystore Password")
-            .description("The store password for the key store file. Corresponds to Kafka's
'ssl.keystore.password' property.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .sensitive(true)
-            .build();
-    static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
-            .name("ssl.truststore.password")
-            .displayName("SSL Truststore Password")
-            .description("The password for the trust store file. Corresponds to Kafka's 'ssl.truststore.password'
property.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .sensitive(true)
+        .identifiesControllerService(SSLContextService.class)
             .build();
 
     static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder()
@@ -166,9 +150,7 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends
AbstractSessi
         SHARED_DESCRIPTORS.add(CLIENT_ID);
         SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
         SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
-        SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD);
-        SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD);
-        SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD);
+        SHARED_DESCRIPTORS.add(SSL_CONTEXT_SERVICE);
 
         SHARED_RELATIONSHIPS.add(REL_SUCCESS);
     }
@@ -347,6 +329,13 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends
AbstractSessi
     Properties buildKafkaProperties(ProcessContext context) {
         Properties properties = new Properties();
         for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
+            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+                // Translate SSLContext Service configuration into Kafka properties
+                final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+                buildSSLKafkaProperties(sslContextService, properties);
+                continue;
+            }
+
             String pName = propertyDescriptor.getName();
             String pValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
@@ -360,4 +349,24 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends
AbstractSessi
         }
         return properties;
     }
+
+    private void buildSSLKafkaProperties(final SSLContextService sslContextService, final
Properties properties) {
+        if (sslContextService == null) {
+            return;
+        }
+
+        if (sslContextService.isKeyStoreConfigured()) {
+            properties.setProperty("ssl.keystore.location", sslContextService.getKeyStoreFile());
+            properties.setProperty("ssl.keystore.password", sslContextService.getKeyStorePassword());
+            final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword()
: sslContextService.getKeyPassword();
+            properties.setProperty("ssl.key.password", keyPass);
+            properties.setProperty("ssl.keystore.type", sslContextService.getKeyStoreType());
+        }
+
+        if (sslContextService.isTrustStoreConfigured()) {
+            properties.setProperty("ssl.truststore.location", sslContextService.getTrustStoreFile());
+            properties.setProperty("ssl.truststore.password", sslContextService.getTrustStorePassword());
+            properties.setProperty("ssl.truststore.type", sslContextService.getTrustStoreType());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc9a46c6/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 2bc1cfb..f51c064 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -242,7 +242,6 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[],
byte[]
             this.checkIfInitialConnectionPossible();
         }
 
-        System.out.println(kafkaProperties);
         if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
             kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         }


Mime
View raw message