nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [nifi] branch master updated: NIFI-4820 This closes #3813. Improving security configuration for Kafka 2.0 processors
Date Tue, 15 Oct 2019 18:06:21 GMT
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d570fe8  NIFI-4820 This closes #3813. Improving security configuration for Kafka
2.0 processors
d570fe8 is described below

commit d570fe81543b706acf9a797fb05fac68a384b3f4
Author: Bryan Bende <bbende@apache.org>
AuthorDate: Mon Oct 14 14:48:49 2019 -0400

    NIFI-4820 This closes #3813. Improving security configuration for Kafka 2.0 processors
    
    Signed-off-by: Joe Witt <joewitt@apache.org>
---
 .../kafka/pubsub/ConsumeKafkaRecord_2_0.java       |   4 +
 .../kafka/pubsub/KafkaProcessorUtils.java          | 159 ++++++++++++++++++---
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       |   4 +
 .../kafka/pubsub/TestConsumeKafkaRecord_2_0.java   |  56 +++++++-
 4 files changed, 203 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index fb31e32..7f5c75a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -232,10 +232,14 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         descriptors.add(RECORD_WRITER);
         descriptors.add(HONOR_TRANSACTIONS);
         descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+        descriptors.add(KafkaProcessorUtils.SASL_MECHANISM);
         descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
         descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
         descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
+        descriptors.add(KafkaProcessorUtils.USERNAME);
+        descriptors.add(KafkaProcessorUtils.PASSWORD);
+        descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
         descriptors.add(AUTO_OFFSET_RESET);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bec65d0..5c2fa68 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -16,20 +16,6 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -52,6 +38,20 @@ import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
 final class KafkaProcessorUtils {
     private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
@@ -69,11 +69,26 @@ final class KafkaProcessorUtils {
     static final String KAFKA_OFFSET = "kafka.offset";
     static final String KAFKA_TIMESTAMP = "kafka.timestamp";
     static final String KAFKA_COUNT = "kafka.count";
+
     static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT",
"PLAINTEXT");
     static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
     static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT",
"SASL_PLAINTEXT", "SASL_PLAINTEXT");
     static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL",
"SASL_SSL");
 
+    static final String GSSAPI_VALUE = "GSSAPI";
+    static final AllowableValue SASL_MECHANISM_GSSAPI = new AllowableValue(GSSAPI_VALUE,
GSSAPI_VALUE,
+            "The mechanism for authentication via Kerberos. The principal and keytab must
be provided to the processor " +
+                    "by using a Keytab Credential service, or by specifying the properties
directly in the processor.");
+
+    static final String PLAIN_VALUE = "PLAIN";
+    static final AllowableValue SASL_MECHANISM_PLAIN = new AllowableValue(PLAIN_VALUE, PLAIN_VALUE,
+            "The mechanism for authentication via username and password. The username and
password properties must " +
+                    "be populated when using this mechanism.");
+
+    static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
+    static final AllowableValue SASL_MECHANISM_SCRAM = new AllowableValue(SCRAM_SHA256_VALUE,
SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism. " +
+            "The username and password properties must be set when using this mechanism.");
+
     static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
             .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
             .displayName("Kafka Brokers")
@@ -92,6 +107,15 @@ final class KafkaProcessorUtils {
             .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
             .defaultValue(SEC_PLAINTEXT.getValue())
             .build();
+    static final PropertyDescriptor SASL_MECHANISM = new PropertyDescriptor.Builder()
+            .name("sasl.mechanism")
+            .displayName("SASL Mechanism")
+            .description("The SASL mechanism to use for authentication. Corresponds to Kafka's
'sasl.mechanism' property.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM)
+            .defaultValue(GSSAPI_VALUE)
+            .build();
     static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
             .name("sasl.kerberos.service.name")
             .displayName("Kerberos Service Name")
@@ -121,6 +145,31 @@ final class KafkaProcessorUtils {
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("sasl.username")
+            .displayName("Username")
+            .description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or
" + SCRAM_SHA256_VALUE)
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("sasl.password")
+            .displayName("Password")
+            .description("The password for the given username when the SASL Mechanism is
" + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+    static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder()
+            .name("sasl.token.auth")
+            .displayName("Token Auth")
+            .description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE
+ ", this property indicates if token authentication should be used.")
+            .required(false)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("ssl.context.service")
             .displayName("SSL Context Service")
@@ -140,10 +189,14 @@ final class KafkaProcessorUtils {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
                 SECURITY_PROTOCOL,
+                SASL_MECHANISM,
                 JAAS_SERVICE_NAME,
                 KERBEROS_CREDENTIALS_SERVICE,
                 USER_PRINCIPAL,
                 USER_KEYTAB,
+                USERNAME,
+                PASSWORD,
+                TOKEN_AUTH,
                 SSL_CONTEXT_SERVICE
         );
     }
@@ -151,7 +204,8 @@ final class KafkaProcessorUtils {
     static Collection<ValidationResult> validateCommonProperties(final ValidationContext
validationContext) {
         List<ValidationResult> results = new ArrayList<>();
 
-        String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+        final String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+        final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();
 
         final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
         final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
@@ -185,9 +239,10 @@ final class KafkaProcessorUtils {
                 .build());
         }
 
-        // validates that if one of SASL (Kerberos) option is selected for
-        // security protocol, then Kerberos principal is provided as well
-        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol))
{
+        // validates that if the SASL mechanism is GSSAPI (kerberos) AND one of the SASL
options is selected
+        // for security protocol, then Kerberos principal is provided as well
+        if (SASL_MECHANISM_GSSAPI.getValue().equals(saslMechanism)
+                && (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)))
{
             String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
             if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
                 results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
@@ -207,6 +262,29 @@ final class KafkaProcessorUtils {
             }
         }
 
+        // validate that if SASL Mechanism is PLAIN or SCRAM, then username and password
are both provided
+        if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) || SASL_MECHANISM_SCRAM.getValue().equals(saslMechanism))
{
+            final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            if (StringUtils.isBlank(username)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(USERNAME.getDisplayName())
+                        .valid(false)
+                        .explanation("A username is required when " + SASL_MECHANISM.getDisplayName()
+                                + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+                        .build());
+            }
+
+            final String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            if (StringUtils.isBlank(password)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(PASSWORD.getDisplayName())
+                        .valid(false)
+                        .explanation("A password is required when " + SASL_MECHANISM.getDisplayName()
+                                + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+                        .build());
+            }
+        }
+
         // If SSL or SASL_SSL then SSLContext Controller Service must be set.
         final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
         final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
@@ -356,6 +434,23 @@ final class KafkaProcessorUtils {
      * @param context Context
      */
     private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext
context) {
+        final String saslMechanism = context.getProperty(SASL_MECHANISM).getValue();
+        switch (saslMechanism) {
+            case GSSAPI_VALUE:
+                setGssApiJaasConfig(mapToPopulate, context);
+                break;
+            case PLAIN_VALUE:
+                setPlainJaasConfig(mapToPopulate, context);
+                break;
+            case SCRAM_SHA256_VALUE:
+                setScramJaasConfig(mapToPopulate, context);
+                break;
+            default:
+                throw new IllegalStateException("Unknown " + SASL_MECHANISM.getDisplayName()
+ ": " + saslMechanism);
+        }
+    }
+
+    private static void setGssApiJaasConfig(final Map<String, Object> mapToPopulate,
final ProcessContext context) {
         String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
         String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
 
@@ -369,7 +464,7 @@ final class KafkaProcessorUtils {
 
 
         String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) &&
StringUtils.isNotBlank(serviceName)) {
+        if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) &&
StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule
required "
                     + "useTicketCache=false "
                     + "renewTicket=true "
@@ -380,6 +475,32 @@ final class KafkaProcessorUtils {
         }
     }
 
+    private static void setPlainJaasConfig(final Map<String, Object> mapToPopulate,
final ProcessContext context) {
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule
required "
+                + "username=\"" + username + "\" "
+                + "password=\"" + password + "\";");
+    }
+
+    private static void setScramJaasConfig(final Map<String, Object> mapToPopulate,
final ProcessContext context) {
+        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        final StringBuilder builder = new StringBuilder("org.apache.kafka.common.security.scram.ScramLoginModule
required ")
+                .append("username=\"" + username + "\" ")
+                .append("password=\"" + password + "\"");
+
+        final Boolean tokenAuth = context.getProperty(TOKEN_AUTH).asBoolean();
+        if (tokenAuth != null && tokenAuth) {
+            builder.append(" tokenauth=\"true\"");
+        }
+
+        builder.append(";");
+        mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, builder.toString());
+    }
+
     private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>...
classes) {
         return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 414523b..628d4ad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -266,10 +266,14 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
         properties.add(ATTRIBUTE_NAME_REGEX);
         properties.add(MESSAGE_HEADER_ENCODING);
         properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+        properties.add(KafkaProcessorUtils.SASL_MECHANISM);
         properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
         properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
         properties.add(KafkaProcessorUtils.USER_KEYTAB);
+        properties.add(KafkaProcessorUtils.USERNAME);
+        properties.add(KafkaProcessorUtils.PASSWORD);
+        properties.add(KafkaProcessorUtils.TOKEN_AUTH);
         properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
         properties.add(MAX_REQUEST_SIZE);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
index dc5331d..667940f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -192,7 +192,7 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void testJaasConfiguration() throws Exception {
+    public void testJaasConfigurationWithDefaultMechanism() {
         runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
@@ -213,4 +213,58 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.assertValid();
     }
 
+    @Test
+    public void testJaasConfigurationWithPlainMechanism() {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.PLAIN_VALUE);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.assertValid();
+
+        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testJaasConfigurationWithScramMechanism() {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA256_VALUE);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.assertValid();
+
+        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNonSaslSecurityProtocol() {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_PLAINTEXT);
+        runner.assertValid();
+    }
+
 }


Mime
View raw message