flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [16/20] flink git commit: [FLINK-2268] Dynamically load Hadoop security module when available
Date Wed, 27 Sep 2017 11:09:21 GMT
[FLINK-2268] Dynamically load Hadoop security module when available


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f1c2331
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f1c2331
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f1c2331

Branch: refs/heads/master
Commit: 7f1c23317453859ce3b136b6e13f698d3fee34a1
Parents: ed11548
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Aug 24 14:22:26 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Sep 27 10:05:11 2017 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |   7 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  21 ----
 .../flink/runtime/security/KerberosUtils.java   |  12 +-
 .../flink/runtime/security/SecurityUtils.java   |  88 +++++++-------
 .../runtime/security/modules/HadoopModule.java  |  29 ++++-
 .../security/modules/HadoopModuleFactory.java   |  57 +++++++++
 .../runtime/security/modules/JaasModule.java    |  10 +-
 .../security/modules/JaasModuleFactory.java     |  32 +++++
 .../security/modules/SecurityModule.java        |   5 +-
 .../security/modules/SecurityModuleFactory.java |  37 ++++++
 .../security/modules/ZooKeeperModule.java       |  20 ++-
 .../modules/ZookeeperModuleFactory.java         |  32 +++++
 .../apache/flink/runtime/util/HadoopUtils.java  | 121 +++++++++++++++++++
 .../runtime/security/SecurityUtilsTest.java     |  38 +++---
 .../flink/test/util/TestingSecurityContext.java |  21 ++--
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  14 ++-
 .../main/java/org/apache/flink/yarn/Utils.java  |  11 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  13 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  13 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  24 ++--
 .../yarn/entrypoint/YarnEntrypointUtils.java    |  13 +-
 21 files changed, 474 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 866b2f3..c9d13dc 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -124,7 +126,10 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
+		SecurityUtils.SecurityConfiguration ctx =
+			new SecurityUtils.SecurityConfiguration(
+				flinkConfig,
+				Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, conf)));
 		try {
 			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 8760968..464dcdf 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -23,16 +23,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -43,8 +38,6 @@ public final class HadoopUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
 
-	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
-
 	/**
 	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
 	 */
@@ -126,20 +119,6 @@ public final class HadoopUtils {
 	}
 
 	/**
-	 * Indicates whether the current user has an HDFS delegation token.
-	 */
-	public static boolean hasHDFSDelegationToken() throws Exception {
-		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-		for (Token<? extends TokenIdentifier> token : usrTok) {
-			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private HadoopUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
index 5662d29..4ce5d81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.security;
 
 import org.apache.flink.annotation.Internal;
 
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +51,13 @@ public class KerberosUtils {
 
 	private static final AppConfigurationEntry userKerberosAce;
 
+	/* Return the Kerberos login module name */
+	public static String getKrb5LoginModuleName() {
+		return System.getProperty("java.vendor").contains("IBM")
+			? "com.ibm.security.auth.module.Krb5LoginModule"
+			: "com.sun.security.auth.module.Krb5LoginModule";
+	}
+
 	static {
 
 		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
@@ -80,7 +86,7 @@ public class KerberosUtils {
 		kerberosCacheOptions.putAll(debugOptions);
 
 		userKerberosAce = new AppConfigurationEntry(
-				KerberosUtil.getKrb5LoginModuleName(),
+				getKrb5LoginModuleName(),
 				AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
 				kerberosCacheOptions);
 
@@ -112,7 +118,7 @@ public class KerberosUtils {
 		keytabKerberosOptions.putAll(debugOptions);
 
 		AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry(
-				KerberosUtil.getKrb5LoginModuleName(),
+				getKrb5LoginModuleName(),
 				AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
 				keytabKerberosOptions);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index bdaaed6..dd5ac0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.HadoopModuleFactory;
+import org.apache.flink.runtime.security.modules.JaasModuleFactory;
 import org.apache.flink.runtime.security.modules.SecurityModule;
-import org.apache.flink.runtime.security.modules.ZooKeeperModule;
+import org.apache.flink.runtime.security.modules.SecurityModuleFactory;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.runtime.security.modules.ZookeeperModuleFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,8 +40,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Utils for configuring security. The following security subsystems are supported:
  * 1. Java Authentication and Authorization Service (JAAS)
@@ -76,10 +73,13 @@ public class SecurityUtils {
 		// install the security modules
 		List<SecurityModule> modules = new ArrayList<>();
 		try {
-			for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) {
-				SecurityModule module = moduleClass.newInstance();
-				module.install(config);
-				modules.add(module);
+			for (SecurityModuleFactory moduleFactory : config.getSecurityModuleFactories()) {
+				SecurityModule module = moduleFactory.createModule(config);
+				// can be null if a SecurityModule is not supported in the current environment
+				if (module != null) {
+					module.install();
+					modules.add(module);
+				}
 			}
 		}
 		catch (Exception ex) {
@@ -87,18 +87,32 @@ public class SecurityUtils {
 		}
 		installedModules = modules;
 
-		// install a security context
-		// use the Hadoop login user as the subject of the installed security context
-		if (!(installedContext instanceof NoOpSecurityContext)) {
-			LOG.warn("overriding previous security context");
+		// First check if we have Hadoop in the ClassPath. If not, we simply don't do anything.
+		try {
+			Class.forName(
+				"org.apache.hadoop.security.UserGroupInformation",
+				false,
+				SecurityUtils.class.getClassLoader());
+
+			// install a security context
+			// use the Hadoop login user as the subject of the installed security context
+			if (!(installedContext instanceof NoOpSecurityContext)) {
+				LOG.warn("overriding previous security context");
+			}
+			UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+			installedContext = new HadoopSecurityContext(loginUser);
+		} catch (ClassNotFoundException e) {
+			LOG.info("Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.");
+		} catch (LinkageError e) {
+			LOG.error("Cannot install HadoopSecurityContext.", e);
 		}
-		UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-		installedContext = new HadoopSecurityContext(loginUser);
 	}
 
 	static void uninstall() {
 		if (installedModules != null) {
-			for (SecurityModule module : Lists.reverse(installedModules)) {
+			// uninstall them in reverse order
+			for (int i = installedModules.size() - 1; i >= 0; i--) {
+				SecurityModule module = installedModules.get(i);
 				try {
 					module.uninstall();
 				}
@@ -121,12 +135,12 @@ public class SecurityUtils {
 	 */
 	public static class SecurityConfiguration {
 
-		private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
-			Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class));
+		private static final List<SecurityModuleFactory> DEFAULT_MODULES = Collections.unmodifiableList(
+			Arrays.asList(new HadoopModuleFactory(), new JaasModuleFactory(), new ZookeeperModuleFactory()));
 
-		private final List<Class<? extends SecurityModule>> securityModules;
+		private final List<SecurityModuleFactory> securityModuleFactories;
 
-		private final org.apache.hadoop.conf.Configuration hadoopConf;
+		private final Configuration flinkConfig;
 
 		private final boolean isZkSaslDisable;
 
@@ -147,28 +161,16 @@ public class SecurityUtils {
 		 * @param flinkConf the Flink global configuration.
          */
 		public SecurityConfiguration(Configuration flinkConf) {
-			this(flinkConf, HadoopUtils.getHadoopConfiguration(flinkConf));
+			this(flinkConf, DEFAULT_MODULES);
 		}
 
 		/**
 		 * Create a security configuration from the global configuration.
 		 * @param flinkConf the Flink global configuration.
-		 * @param hadoopConf the Hadoop configuration.
-		 */
-		public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
-			this(flinkConf, hadoopConf, DEFAULT_MODULES);
-		}
-
-		/**
-		 * Create a security configuration from the global configuration.
-		 * @param flinkConf the Flink global configuration.
-		 * @param hadoopConf the Hadoop configuration.
-		 * @param securityModules the security modules to apply.
+		 * @param securityModuleFactories the security modules to apply.
 		 */
 		public SecurityConfiguration(Configuration flinkConf,
-				org.apache.hadoop.conf.Configuration hadoopConf,
-				List<? extends Class<? extends SecurityModule>> securityModules) {
-			this.hadoopConf = checkNotNull(hadoopConf);
+				List<SecurityModuleFactory> securityModuleFactories) {
 			this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
 			this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 			this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
@@ -176,8 +178,8 @@ public class SecurityUtils {
 			this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
 			this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
 			this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
-			this.securityModules = Collections.unmodifiableList(securityModules);
-
+			this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
+			this.flinkConfig = checkNotNull(flinkConf);
 			validate();
 		}
 
@@ -197,12 +199,12 @@ public class SecurityUtils {
 			return useTicketCache;
 		}
 
-		public org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-			return hadoopConf;
+		public Configuration getFlinkConfig() {
+			return flinkConfig;
 		}
 
-		public List<Class<? extends SecurityModule>> getSecurityModules() {
-			return securityModules;
+		public List<SecurityModuleFactory> getSecurityModuleFactories() {
+			return securityModuleFactories;
 		}
 
 		public List<String> getLoginContextNames() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 05be314..0020a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.security.modules;
 
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.HadoopUtils;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -37,6 +38,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Responsible for installing a Hadoop login user.
  */
@@ -44,12 +47,23 @@ public class HadoopModule implements SecurityModule {
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
 
-	UserGroupInformation loginUser;
+	private final SecurityUtils.SecurityConfiguration securityConfig;
+
+	private final Configuration hadoopConfiguration;
+
+	public HadoopModule(
+		SecurityUtils.SecurityConfiguration securityConfiguration,
+		Configuration hadoopConfiguration) {
+		this.securityConfig = checkNotNull(securityConfiguration);
+		this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
+	}
 
 	@Override
-	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+	public void install() throws SecurityInstallException {
+
+		UserGroupInformation.setConfiguration(hadoopConfiguration);
 
-		UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+		UserGroupInformation loginUser;
 
 		try {
 			if (UserGroupInformation.isSecurityEnabled() &&
@@ -70,8 +84,11 @@ public class HadoopModule implements SecurityModule {
 					try {
 						Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
 							File.class, org.apache.hadoop.conf.Configuration.class);
-						Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-							securityConfig.getHadoopConfiguration());
+						Credentials cred =
+							(Credentials) readTokenStorageFileMethod.invoke(
+								null,
+								new File(fileLocation),
+								hadoopConfiguration);
 
 						// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
 						// the UGI would prefer the delegation token instead, which eventually expires

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java
new file mode 100644
index 0000000..fc7aac9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModuleFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link SecurityModuleFactory} for {@link HadoopModule}. This checks if Hadoop dependencies
+ * are available before creating a {@link HadoopModule}.
+ */
+public class HadoopModuleFactory implements SecurityModuleFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopModuleFactory.class);
+
+	@Override
+	public SecurityModule createModule(SecurityUtils.SecurityConfiguration securityConfig) {
+		// First check if we have Hadoop in the ClassPath. If not, we simply don't do anything.
+		try {
+			Class.forName(
+				"org.apache.hadoop.conf.Configuration",
+				false,
+				HadoopModule.class.getClassLoader());
+		} catch (ClassNotFoundException e) {
+			LOG.info("Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.");
+			return null;
+		}
+
+		try {
+			Configuration hadoopConfiguration = HadoopUtils.getHadoopConfiguration(securityConfig.getFlinkConfig());
+			return new HadoopModule(securityConfig, hadoopConfiguration);
+		} catch (LinkageError e) {
+			LOG.error("Cannot create Hadoop Security Module.", e);
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
index 91411a9..c1d1ecc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -35,6 +35,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Responsible for installing a process-wide JAAS configuration.
  *
@@ -57,13 +59,19 @@ public class JaasModule implements SecurityModule {
 
 	static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
 
+	private final SecurityUtils.SecurityConfiguration securityConfig;
+
 	private String priorConfigFile;
 	private javax.security.auth.login.Configuration priorConfig;
 
 	private DynamicConfiguration currentConfig;
 
+	public JaasModule(SecurityUtils.SecurityConfiguration securityConfig) {
+		this.securityConfig = checkNotNull(securityConfig);
+	}
+
 	@Override
-	public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException {
+	public void install() throws SecurityInstallException {
 
 		// ensure that a config file is always defined, for compatibility with
 		// ZK and Kafka which check for the system property and existence of the file

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModuleFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModuleFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModuleFactory.java
new file mode 100644
index 0000000..7017a0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModuleFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * A {@link SecurityModuleFactory} for {@link JaasModule}.
+ */
+public class JaasModuleFactory implements SecurityModuleFactory {
+
+	@Override
+	public SecurityModule createModule(SecurityUtils.SecurityConfiguration securityConfig) {
+		return new JaasModule(securityConfig);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
index 1a335df..00c2c4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.security.modules;
 
-import org.apache.flink.runtime.security.SecurityUtils;
-
 import java.security.GeneralSecurityException;
 
 /**
@@ -30,10 +28,9 @@ public interface SecurityModule {
 	/**
 	 * Install the security module.
 	 *
-	 * @param configuration the security configuration.
 	 * @throws SecurityInstallException if the security module couldn't be installed.
 	 */
-	void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException;
+	void install() throws SecurityInstallException;
 
 	/**
 	 * Uninstall the security module.

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModuleFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModuleFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModuleFactory.java
new file mode 100644
index 0000000..d345885
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModuleFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * A factory for a {@link SecurityModule}. A factory can determine whether a {@link SecurityModule}
+ * works in the given environment (for example, it can check whether Hadoop dependencies are
+ * available) and can then create (or not) a module based on that.
+ */
+@FunctionalInterface
+public interface SecurityModuleFactory {
+
+	/**
+	 * Creates and returns a {@link SecurityModule}. This can return {@code null} if the type
+	 * of {@link SecurityModule} that this factory can create does not work in the current
+	 * environment.
+	 */
+	SecurityModule createModule(SecurityUtils.SecurityConfiguration securityConfig);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index af2c1f8..cf49c0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.runtime.security.SecurityUtils;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Responsible for installing a process-wide ZooKeeper security configuration.
  */
@@ -42,26 +44,32 @@ public class ZooKeeperModule implements SecurityModule {
 	 */
 	private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";
 
+	private final SecurityUtils.SecurityConfiguration securityConfig;
+
 	private String priorSaslEnable;
 
 	private String priorServiceName;
 
 	private String priorLoginContextName;
 
+	public ZooKeeperModule(SecurityUtils.SecurityConfiguration securityConfig) {
+		this.securityConfig = checkNotNull(securityConfig);
+	}
+
 	@Override
-	public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+	public void install() throws SecurityInstallException {
 
 		priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
-		System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!configuration.isZkSaslDisable()));
+		System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
 
 		priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
-		if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
-			System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName());
+		if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
+			System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
 		}
 
 		priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
-		if (!"Client".equals(configuration.getZooKeeperLoginContextName())) {
-			System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName());
+		if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
+			System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZookeeperModuleFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZookeeperModuleFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZookeeperModuleFactory.java
new file mode 100644
index 0000000..0e259b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZookeeperModuleFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * A {@link SecurityModuleFactory} for {@link ZooKeeperModule}.
+ */
+public class ZookeeperModuleFactory implements SecurityModuleFactory {
+
+	@Override
+	public SecurityModule createModule(SecurityUtils.SecurityConfiguration securityConfig) {
+		return new ZooKeeperModule(securityConfig);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
new file mode 100644
index 0000000..ca0630c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be used if Hadoop
+ * is on the classpath.
+ */
+public class HadoopUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+
+	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
+
+		Configuration result = new Configuration();
+		boolean foundHadoopConfiguration = false;
+
+		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
+		// the hdfs configuration
+		// Try to load HDFS configuration from Hadoop's own configuration files
+		// 1. approach: Flink configuration
+		final String hdfsDefaultPath =
+			flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+
+		if (hdfsDefaultPath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
+			LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
+		}
+
+		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		if (hdfsSitePath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
+			LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
+		}
+
+		// 2. Approach environment variables
+		String[] possibleHadoopConfPaths = new String[4];
+		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+		if (System.getenv("HADOOP_HOME") != null) {
+			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
+			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
+		}
+
+		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+			if (possibleHadoopConfPath != null) {
+				if (new File(possibleHadoopConfPath).exists()) {
+					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+				}
+			}
+		}
+
+		if (!foundHadoopConfiguration) {
+			LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
+				"(Flink configuration, environment variables).");
+		}
+
+		return result;
+	}
+
+	/**
+	 * Indicates whether the current user has an HDFS delegation token.
+	 */
+	public static boolean hasHDFSDelegationToken() throws Exception {
+		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for (Token<? extends TokenIdentifier> token : usrTok) {
+			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index c179f6f..30092e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.SecurityModuleFactory;
 
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -43,7 +44,7 @@ public class SecurityUtilsTest {
 		boolean installed;
 
 		@Override
-		public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException {
+		public void install() throws SecurityInstallException {
 			installed = true;
 		}
 
@@ -51,6 +52,13 @@ public class SecurityUtilsTest {
 		public void uninstall() throws SecurityInstallException {
 			installed = false;
 		}
+
+		static class Factory implements SecurityModuleFactory {
+			@Override
+			public SecurityModule createModule(SecurityUtils.SecurityConfiguration securityConfig) {
+				return new TestSecurityModule();
+			}
+		}
 	}
 
 	@AfterClass
@@ -61,8 +69,8 @@ public class SecurityUtilsTest {
 	@Test
 	public void testModuleInstall() throws Exception {
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
-			new Configuration(), new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			new Configuration(),
+			Collections.singletonList(new TestSecurityModule.Factory()));
 
 		SecurityUtils.install(sc);
 		assertEquals(1, SecurityUtils.getInstalledModules().size());
@@ -77,8 +85,8 @@ public class SecurityUtilsTest {
 	@Test
 	public void testSecurityContext() throws Exception {
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(
-			new Configuration(), new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			new Configuration(),
+			Collections.singletonList(new TestSecurityModule.Factory()));
 
 		SecurityUtils.install(sc);
 		assertEquals(HadoopSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
@@ -100,8 +108,8 @@ public class SecurityUtilsTest {
 		testFlinkConf = new Configuration();
 		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,Client");
 		testSecurityConf = new SecurityUtils.SecurityConfiguration(
-			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			testFlinkConf,
+			Collections.singletonList(new TestSecurityModule.Factory()));
 		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
 
 		// ------- with whitespaces surrounding comma
@@ -109,8 +117,8 @@ public class SecurityUtilsTest {
 		testFlinkConf = new Configuration();
 		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar , Client");
 		testSecurityConf = new SecurityUtils.SecurityConfiguration(
-			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			testFlinkConf,
+			Collections.singletonList(new TestSecurityModule.Factory()));
 		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
 
 		// ------- leading / trailing whitespaces at start and end of list
@@ -118,8 +126,8 @@ public class SecurityUtilsTest {
 		testFlinkConf = new Configuration();
 		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, " Foo bar , Client ");
 		testSecurityConf = new SecurityUtils.SecurityConfiguration(
-			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			testFlinkConf,
+			Collections.singletonList(new TestSecurityModule.Factory()));
 		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
 
 		// ------- empty entries
@@ -127,8 +135,8 @@ public class SecurityUtilsTest {
 		testFlinkConf = new Configuration();
 		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,,Client");
 		testSecurityConf = new SecurityUtils.SecurityConfiguration(
-			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			testFlinkConf,
+			Collections.singletonList(new TestSecurityModule.Factory()));
 		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
 
 		// ------- empty trailing String entries with whitespaces
@@ -136,8 +144,8 @@ public class SecurityUtilsTest {
 		testFlinkConf = new Configuration();
 		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar, ,, Client,");
 		testSecurityConf = new SecurityUtils.SecurityConfiguration(
-			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
-			Collections.singletonList(TestSecurityModule.class));
+			testFlinkConf,
+			Collections.singletonList(new TestSecurityModule.Factory()));
 		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 34a78ae..580bfb4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -22,14 +22,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.security.DynamicConfiguration;
 import org.apache.flink.runtime.security.KerberosUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.JaasModuleFactory;
+import org.apache.flink.runtime.security.modules.SecurityModuleFactory;
 
 import javax.security.auth.login.AppConfigurationEntry;
 
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Test security context to support handling both client and server principals in MiniKDC.
  * This class is used only in integration test code for connectors like Kafka, HDFS etc.,
@@ -44,11 +43,17 @@ public class TestingSecurityContext {
 		SecurityUtils.install(config);
 
 		// install dynamic JAAS entries
-		checkArgument(config.getSecurityModules().contains(JaasModule.class));
-		DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration();
-		for (Map.Entry<String, ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) {
-			AppConfigurationEntry entry = KerberosUtils.keytabEntry(e.getValue().getKeytab(), e.getValue().getPrincipal());
-			jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+		for (SecurityModuleFactory factory : config.getSecurityModuleFactories()) {
+			if (factory instanceof JaasModuleFactory) {
+				DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration();
+				for (Map.Entry<String, ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) {
+					AppConfigurationEntry entry = KerberosUtils.keytabEntry(
+						e.getValue().getKeytab(),
+						e.getValue().getPrincipal());
+					jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+				}
+				break;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index c9f120b..28d60d3 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
 
@@ -32,6 +33,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 
 /**
@@ -62,10 +64,16 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
 				SecureTestEnvironment.getHadoopServicePrincipal());
 
-		SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig,
-			YARN_CONFIGURATION);
+		SecurityUtils.SecurityConfiguration securityConfig =
+			new SecurityUtils.SecurityConfiguration(
+				flinkConfig,
+				Collections.singletonList(securityConfig1 -> {
+					// manually override the Hadoop Configuration
+					return new HadoopModule(securityConfig1, YARN_CONFIGURATION);
+				}));
+
 		try {
-			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
+			TestingSecurityContext.install(securityConfig, SecureTestEnvironment.getClientSecurityConfigurationMap());
 
 			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 98d27ab..32cbb64 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -21,7 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.HadoopUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -457,8 +457,13 @@ public final class Utils {
 			String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
 			Method readTokenStorageFileMethod = Credentials.class.getMethod(
 				"readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class);
-			Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-				new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration());
+
+			Credentials cred =
+				(Credentials) readTokenStorageFileMethod.invoke(
+					null,
+					new File(fileLocation),
+					HadoopUtils.getHadoopConfiguration(flinkConfig));
+
 			cred.writeTokenStorageToStream(dob);
 			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 			ctx.setTokens(securityTokens);

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 659f873..f7c9d30 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -61,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -169,21 +171,18 @@ public class YarnApplicationMasterRunner {
 				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
-			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
+			SecurityUtils.SecurityConfiguration sc;
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
 			if (krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
-				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-			}
-
-			SecurityUtils.SecurityConfiguration sc;
-			if (hadoopConfiguration != null) {
-				sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+				sc = new SecurityUtils.SecurityConfiguration(flinkConfig,
+					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
 			} else {
 				sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index e0dc55d..9bcad2a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -39,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -125,21 +127,20 @@ public class YarnTaskExecutorRunner {
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
 					currentUser.getShortUserName(), yarnClientUsername);
 
-			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
+			SecurityUtils.SecurityConfiguration sc;
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
 			if (krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
-				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-			}
 
-			SecurityUtils.SecurityConfiguration sc;
-			if (hadoopConfiguration != null) {
-				sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+				sc = new SecurityUtils.SecurityConfiguration(configuration,
+					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
+
 			} else {
 				sc = new SecurityUtils.SecurityConfiguration(configuration);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 265c5a6..5f60057 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -112,29 +114,29 @@ public class YarnTaskManagerRunner {
 
 		try {
 
-			org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
+			SecurityUtils.SecurityConfiguration sc;
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
 			if (krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
-				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-			}
 
-			// set keytab principal and replace path with the local path of the shipped keytab file in NodeManager
-			if (localKeytabPath != null && remoteKeytabPrincipal != null) {
-				configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath);
-				configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
-			}
+				// set keytab principal and replace path with the local path of the shipped keytab file in NodeManager
+				if (localKeytabPath != null && remoteKeytabPrincipal != null) {
+					configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath);
+					configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+				}
+
+				sc = new SecurityUtils.SecurityConfiguration(configuration,
+					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
 
-			SecurityUtils.SecurityConfiguration sc;
-			if (hadoopConfiguration != null) {
-				sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
 			} else {
 				sc = new SecurityUtils.SecurityConfiguration(configuration);
+
 			}
 
 			SecurityUtils.install(sc);

http://git-wip-us.apache.org/repos/asf/flink/blob/7f1c2331/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index d36e769..d01d0d0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.Utils;
 import org.apache.flink.yarn.YarnConfigKeys;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -52,19 +54,18 @@ public class YarnEntrypointUtils {
 	public static SecurityContext installSecurityContext(
 			Configuration configuration,
 			String workingDirectory) throws Exception {
-		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
+
+		SecurityUtils.SecurityConfiguration sc;
 
 		//To support Yarn Secure Integration Test Scenario
 		File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
 		if (krb5Conf.exists() && krb5Conf.canRead()) {
-			hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+			org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
 			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
 			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-		}
 
-		SecurityUtils.SecurityConfiguration sc;
-		if (hadoopConfiguration != null) {
-			sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+			sc = new SecurityUtils.SecurityConfiguration(configuration,
+				Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
 		} else {
 			sc = new SecurityUtils.SecurityConfiguration(configuration);
 		}


Mime
View raw message