nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [2/3] nifi git commit: NIFI-4917: Externalize Keytab and Principal configuration from Processors to a Controller Service. This gives us the ability to allow users to interact with those Keytabs/Principals to which they've been given access without allowi
Date Wed, 21 Mar 2018 18:24:59 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 7ac330e..d835607 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class KafkaProcessorUtils {
+    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -86,7 +88,7 @@ final class KafkaProcessorUtils {
             .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
             .defaultValue(SEC_PLAINTEXT.getValue())
             .build();
-    static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
             .name("sasl.kerberos.service.name")
             .displayName("Kerberos Service Name")
             .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. "
@@ -121,12 +123,20 @@ final class KafkaProcessorUtils {
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
 
     static List<PropertyDescriptor> getCommonPropertyDescriptors() {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
                 SECURITY_PROTOCOL,
-                KERBEROS_PRINCIPLE,
+                JAAS_SERVICE_NAME,
+                KERBEROS_CREDENTIALS_SERVICE,
                 USER_PRINCIPAL,
                 USER_KEYTAB,
                 SSL_CONTEXT_SERVICE
@@ -138,71 +148,107 @@ final class KafkaProcessorUtils {
 
         String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
 
-        /*
-         * validates that if one of SASL (Kerberos) option is selected for
-         * security protocol, then Kerberos principal is provided as well
-         */
+        final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
+        }
+
+        // validates that if one of SASL (Kerberos) option is selected for
+        // security protocol, then Kerberos principal is provided as well
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
-            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
-                                + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
-                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
-                        .build());
+            String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
+            if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
+                results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
+                    .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <"
+                        + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
+                        + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
+                    .build());
             }
 
-            String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-            String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-            if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
-                    || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
-                                + "must be set.")
-                        .build());
+            if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) {
+                results.add(new ValidationResult.Builder()
+                    .subject(JAAS_SERVICE_NAME.getDisplayName())
+                    .valid(false)
+                    .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                        + "must be set or neither must be set.")
+                    .build());
             }
         }
 
-        //If SSL or SASL_SSL then CS must be set.
+        // If SSL or SASL_SSL then SSLContext Controller Service must be set.
         final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
         final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
         if (csSet && !sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
-                    .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SECURITY_PROTOCOL.getDisplayName())
+                .valid(false)
+                .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.")
+                .build());
         }
+
         if (!csSet && sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
-                    .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SSL_CONTEXT_SERVICE.getDisplayName())
+                .valid(false)
+                .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service")
+                .build());
         }
 
         final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
         if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                    .explanation("Enable auto commit must be false.  It is managed by the processor.").build());
+                .explanation("Enable auto commit must be false. It is managed by the processor.").build());
         }
 
         final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+                .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
         }
 
         final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
+                .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
         }
 
         final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
+                .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
         }
 
         final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
+                .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
         }
 
         return results;
@@ -297,7 +343,17 @@ final class KafkaProcessorUtils {
     private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
         String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
         String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
+
+        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
+        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
+        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        if (credentialsService != null) {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
+        }
+
+
+        String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index 9e66405..b86f60b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -216,7 +216,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
         properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
         properties.add(KafkaProcessorUtils.USER_KEYTAB);
         properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 69dc7de..2a5add9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -104,7 +104,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
@@ -121,7 +121,7 @@ public class ConsumeKafkaTest {
         runner.setVariable("service", "kafka");
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
index 569a887..08f135c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
@@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_0_10 {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
@@ -217,7 +217,7 @@ public class TestConsumeKafkaRecord_0_10 {
         runner.setVariable("service", "kafka");
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml
index 8ea8884..5868e09 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml
@@ -49,6 +49,10 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${kafka11.version}</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
index 5a450c4..c4b0920 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
@@ -220,7 +220,8 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
         descriptors.add(RECORD_WRITER);
         descriptors.add(HONOR_TRANSACTIONS);
         descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
         descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 7ac330e..e88f3da 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class KafkaProcessorUtils {
+    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -86,7 +88,7 @@ final class KafkaProcessorUtils {
             .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
             .defaultValue(SEC_PLAINTEXT.getValue())
             .build();
-    static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
             .name("sasl.kerberos.service.name")
             .displayName("Kerberos Service Name")
             .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. "
@@ -121,12 +123,20 @@ final class KafkaProcessorUtils {
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
 
     static List<PropertyDescriptor> getCommonPropertyDescriptors() {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
                 SECURITY_PROTOCOL,
-                KERBEROS_PRINCIPLE,
+                KERBEROS_CREDENTIALS_SERVICE,
+                JAAS_SERVICE_NAME,
                 USER_PRINCIPAL,
                 USER_KEYTAB,
                 SSL_CONTEXT_SERVICE
@@ -138,71 +148,107 @@ final class KafkaProcessorUtils {
 
         String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
 
-        /*
-         * validates that if one of SASL (Kerberos) option is selected for
-         * security protocol, then Kerberos principal is provided as well
-         */
+        final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
+        }
+
+        // validates that if one of SASL (Kerberos) option is selected for
+        // security protocol, then Kerberos principal is provided as well
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
-            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
-                                + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
-                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
-                        .build());
+            String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
+            if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
+                results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
+                    .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <"
+                        + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
+                        + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
+                    .build());
             }
 
-            String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-            String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-            if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
-                    || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
-                                + "must be set.")
-                        .build());
+            if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) {
+                results.add(new ValidationResult.Builder()
+                    .subject(JAAS_SERVICE_NAME.getDisplayName())
+                    .valid(false)
+                    .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                        + "must be set or neither must be set.")
+                    .build());
             }
         }
 
-        //If SSL or SASL_SSL then CS must be set.
+        // If SSL or SASL_SSL then SSLContext Controller Service must be set.
         final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
         final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
         if (csSet && !sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
-                    .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SECURITY_PROTOCOL.getDisplayName())
+                .valid(false)
+                .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.")
+                .build());
         }
+
         if (!csSet && sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
-                    .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SSL_CONTEXT_SERVICE.getDisplayName())
+                .valid(false)
+                .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service")
+                .build());
         }
 
         final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
         if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                    .explanation("Enable auto commit must be false.  It is managed by the processor.").build());
+                .explanation("Enable auto commit must be false. It is managed by the processor.").build());
         }
 
         final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+                .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
         }
 
         final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
+                .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
         }
 
         final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
+                .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
         }
 
         final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
+                .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
         }
 
         return results;
@@ -297,7 +343,17 @@ final class KafkaProcessorUtils {
     private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
         String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
         String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
+
+        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
+        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
+        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        if (credentialsService != null) {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
+        }
+
+
+        String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
index d42df15..1521bfa 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
@@ -255,7 +255,8 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
         properties.add(ATTRIBUTE_NAME_REGEX);
         properties.add(MESSAGE_HEADER_ENCODING);
         properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
         properties.add(KafkaProcessorUtils.USER_KEYTAB);
         properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 4778f1a..fbf3a69 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -104,7 +104,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
@@ -121,7 +121,7 @@ public class ConsumeKafkaTest {
         runner.setVariable("service", "kafka");
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
index 42a66b1..bb2fdba 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
@@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_0_11 {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
@@ -217,7 +217,7 @@ public class TestConsumeKafkaRecord_0_11 {
         runner.setVariable("service", "kafka");
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
index b2210b3..6bf4d68 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${kafka1.0.version}</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
index f0c1bd0..a64cb8e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -220,7 +220,8 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
         descriptors.add(RECORD_WRITER);
         descriptors.add(HONOR_TRANSACTIONS);
         descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
         descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 7ac330e..d835607 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
@@ -49,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class KafkaProcessorUtils {
+    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -86,7 +88,7 @@ final class KafkaProcessorUtils {
             .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
             .defaultValue(SEC_PLAINTEXT.getValue())
             .build();
-    static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
             .name("sasl.kerberos.service.name")
             .displayName("Kerberos Service Name")
             .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. "
@@ -121,12 +123,20 @@ final class KafkaProcessorUtils {
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
 
     static List<PropertyDescriptor> getCommonPropertyDescriptors() {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
                 SECURITY_PROTOCOL,
-                KERBEROS_PRINCIPLE,
+                JAAS_SERVICE_NAME,
+                KERBEROS_CREDENTIALS_SERVICE,
                 USER_PRINCIPAL,
                 USER_KEYTAB,
                 SSL_CONTEXT_SERVICE
@@ -138,71 +148,107 @@ final class KafkaProcessorUtils {
 
         String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
 
-        /*
-         * validates that if one of SASL (Kerberos) option is selected for
-         * security protocol, then Kerberos principal is provided as well
-         */
+        final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
+        }
+
+        // validates that if one of SASL (Kerberos) option is selected for
+        // security protocol, then Kerberos principal is provided as well
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
-            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
-                                + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
-                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
-                        .build());
+            String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
+            if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
+                results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
+                    .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <"
+                        + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
+                        + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
+                    .build());
             }
 
-            String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-            String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-            if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
-                    || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
-                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
-                                + "must be set.")
-                        .build());
+            if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) {
+                results.add(new ValidationResult.Builder()
+                    .subject(JAAS_SERVICE_NAME.getDisplayName())
+                    .valid(false)
+                    .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                        + "must be set or neither must be set.")
+                    .build());
             }
         }
 
-        //If SSL or SASL_SSL then CS must be set.
+        // If SSL or SASL_SSL then SSLContext Controller Service must be set.
         final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
         final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
         if (csSet && !sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
-                    .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SECURITY_PROTOCOL.getDisplayName())
+                .valid(false)
+                .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.")
+                .build());
         }
+
         if (!csSet && sslProtocol) {
-            results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
-                    .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build());
+            results.add(new ValidationResult.Builder()
+                .subject(SSL_CONTEXT_SERVICE.getDisplayName())
+                .valid(false)
+                .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service")
+                .build());
         }
 
         final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
         if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                    .explanation("Enable auto commit must be false.  It is managed by the processor.").build());
+                .explanation("Enable auto commit must be false. It is managed by the processor.").build());
         }
 
         final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+                .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
         }
 
         final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
             results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
+                .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
         }
 
         final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
+                .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
         }
 
         final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
         if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
             results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
+                .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
         }
 
         return results;
@@ -297,7 +343,17 @@ final class KafkaProcessorUtils {
     private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
         String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
         String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
+
+        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
+        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
+        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        if (credentialsService != null) {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
+        }
+
+
+        String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index 517cb0c..e26d665 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -255,7 +255,8 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
         properties.add(ATTRIBUTE_NAME_REGEX);
         properties.add(MESSAGE_HEADER_ENCODING);
         properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
         properties.add(KafkaProcessorUtils.USER_KEYTAB);
         properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index f6785f9..30e2322 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -104,7 +104,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
@@ -121,7 +121,7 @@ public class ConsumeKafkaTest {
         runner.setVariable("service", "kafka");
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
index d370fec..0bdfd46 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
@@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_1_0 {
         runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
         runner.assertValid();
 
         runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index b7aa55f..50eb70d 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -64,6 +64,10 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-schema-registry-service-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index e571233..6c22c3a 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -52,16 +52,11 @@ import java.io.IOException;
         @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
 })
 @SeeAlso({PutParquet.class})
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = RequiredPermission.READ_FILESYSTEM,
-                        explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.READ_FILESYSTEM,
+        explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.")
+})
 public class FetchParquet extends AbstractFetchHDFSRecord {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index c3907b0..f8c555e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -69,16 +69,11 @@ import java.util.List;
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."),
         @WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file")
 })
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = RequiredPermission.READ_FILESYSTEM,
-                        explanation = "Provides operator the ability to write any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.READ_FILESYSTEM,
+        explanation = "Provides operator the ability to write any file that NiFi has access to in HDFS or the local filesystem.")
+})
 public class PutParquet extends AbstractPutHDFSRecord {
 
     public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
index 4fb3426..e0ef844 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>nifi-record</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index b9655c8..ccbed60 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -54,6 +54,7 @@ import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.slf4j.Logger;
@@ -81,9 +82,19 @@ import java.util.concurrent.atomic.AtomicReference;
 @DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
         description="These properties will be set on the HBase configuration after loading any provided configuration files.")
 public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
+    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     private static final Logger logger = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class);
 
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
+
+
     static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
     static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
     static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
@@ -111,6 +122,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
 
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HADOOP_CONF_FILES);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(kerberosProperties.getKerberosPrincipal());
         props.add(kerberosProperties.getKerberosKeytab());
         props.add(ZOOKEEPER_QUORUM);
@@ -153,6 +165,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
         boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
         boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
 
+        final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
+
         final List<ValidationResult> problems = new ArrayList<>();
 
         if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) {
@@ -177,11 +203,26 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
             }
 
             final Configuration hbaseConfig = resources.getConfiguration();
-            final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
 
-            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(
-                    this.getClass().getSimpleName(), hbaseConfig, principal, keytab, getLogger()));
+            problems.addAll(KerberosProperties.validatePrincipalAndKeytab(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, getLogger()));
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
+            problems.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
+            problems.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
         }
 
         return problems;
@@ -245,8 +286,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
         }
 
         if (SecurityUtil.isSecurityEnabled(hbaseConfig)) {
-            final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+
+            // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
+            // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
+            final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+            if (credentialsService != null) {
+                principal = credentialsService.getPrincipal();
+                keyTab = credentialsService.getKeytab();
+            }
 
             getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[] {principal, keyTab});
             ugi = SecurityUtil.loginKerberos(hbaseConfig, principal, keyTab);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml
new file mode 100644
index 0000000..bc92b9a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml
@@ -0,0 +1,30 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-standard-services</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java
new file mode 100644
index 0000000..d2df3b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kerberos;
+
+import org.apache.nifi.controller.ControllerService;
+
+public interface KerberosCredentialsService extends ControllerService {
+
+    /**
+     * Returns the path to the configured Keytab file
+     *
+     * @return the path to the configured Keytab file
+     */
+    String getKeytab();
+
+    /**
+     * Returns the configured Principal to use when authenticating with Kerberos
+     *
+     * @return the configured Principal to use when authenticating with Kerberos
+     */
+    String getPrincipal();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml
new file mode 100644
index 0000000..8832217
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml
@@ -0,0 +1,37 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-kerberos-credentials-service-bundle</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-kerberos-credentials-service-nar</artifactId>
+    <packaging>nar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore
@@ -0,0 +1 @@
+/bin/


Mime
View raw message