flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-4826] add keytab support to mesos container
Date Wed, 30 Nov 2016 17:31:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8cdb406dc -> fe602eab2


[FLINK-4826] add keytab support to mesos container

This closes #2734.
This closes #2900.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/673c724e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/673c724e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/673c724e

Branch: refs/heads/master
Commit: 673c724e6bdc5c1f410ef8aae1fd1d4c72647f5a
Parents: 8cdb406
Author: Vijay Srinivasaraghavan <vijayaraghavan.srinivasaraghavan@emc.com>
Authored: Thu Oct 13 15:45:35 2016 -0700
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Nov 30 18:32:02 2016 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           | 103 ++++++++++++++-----
 .../clusterframework/MesosConfigKeys.java       |   5 +-
 .../MesosTaskManagerRunner.java                 |  67 +++++++-----
 .../flink/runtime/security/SecurityContext.java |   1 +
 4 files changed, 124 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c35fa82..bca179f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -24,6 +24,7 @@ import akka.actor.Address;
 import akka.actor.Props;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
@@ -66,7 +68,6 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -124,7 +125,7 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged()} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -134,20 +135,27 @@ public class MesosApplicationMasterRunner {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final UserGroupInformation currentUser;
-			try {
-				currentUser = UserGroupInformation.getCurrentUser();
-			} catch (Throwable t) {
-				throw new Exception("Cannot access UserGroupInformation information for current user",
t);
-			}
+			final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+			checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+			// Flink configuration
+			final Configuration dynamicProperties =
+					FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+			final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+
+			SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+			sc.setFlinkConfiguration(configuration);
+			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+			SecurityContext.install(sc);
 
-			LOG.info("Running Flink as user {}", currentUser.getShortUserName());
+			LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
 
-			// run the actual work in a secured privileged action
-			return currentUser.doAs(new PrivilegedAction<Integer>() {
+			return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>()
{
 				@Override
 				public Integer run() {
-					return runPrivileged();
+					return runPrivileged(configuration);
 				}
 			});
 		}
@@ -167,7 +175,7 @@ public class MesosApplicationMasterRunner {
 	 *
 	 * @return The return code for the Java process.
 	 */
-	protected int runPrivileged() {
+	protected int runPrivileged(Configuration config) {
 
 		ActorSystem actorSystem = null;
 		WebMonitor webMonitor = null;
@@ -179,7 +187,6 @@ public class MesosApplicationMasterRunner {
 		// configuration problem occurs
 
 		final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-		checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
 
 		final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
 		checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
@@ -197,13 +204,6 @@ public class MesosApplicationMasterRunner {
 			return INIT_ERROR_EXIT_CODE;
 		}
 
-		// Flink configuration
-		final Configuration dynamicProperties =
-			FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-		LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
-
-		final Configuration config = createConfiguration(workingDir, dynamicProperties);
-
 		// Mesos configuration
 		final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
 
@@ -456,9 +456,7 @@ public class MesosApplicationMasterRunner {
 	private static Configuration createConfiguration(String baseDirectory, Configuration additional)
{
 		LOG.info("Loading config from directory {}", baseDirectory);
 
-		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
-
-		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+		Configuration configuration = GlobalConfiguration.loadConfiguration();
 
 		// add dynamic properties to JobManager configuration.
 		configuration.addAll(additional);
@@ -587,9 +585,6 @@ public class MesosApplicationMasterRunner {
 		String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
 		checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
 
-		String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
-		checkState(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME);
-
 		String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
 		checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
 
@@ -597,6 +592,45 @@ public class MesosApplicationMasterRunner {
 		final File flinkJarFile = new File(workingDirectory, "flink.jar");
 		cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
 
+		String hadoopConfDir = env.get("HADOOP_CONF_DIR");
+		LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
+
+		//upload Hadoop configurations to artifact server
+		boolean hadoopConf = false;
+		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+			File source = new File(hadoopConfDir);
+			if(source.exists() && source.isDirectory()) {
+				hadoopConf = true;
+				File[] fileList = source.listFiles();
+				for(File file: fileList) {
+					if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml"))
{
+						LOG.debug("Adding local file: [{}] to artifact server", file);
+						File f = new File(hadoopConfDir, file.getName());
+						cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
+					}
+				}
+			}
+		}
+
+		//upload keytab to the artifact server
+		String keytabFileName = null;
+		String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+		if(keytab != null) {
+			File source = new File(keytab);
+			if(source.exists()) {
+				LOG.debug("Adding keytab file: [{}] to artifact server", source);
+				keytabFileName = source.getName();
+				cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
+			}
+		}
+
+		String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+		if(keytabFileName != null && principal != null) {
+			//reset the configurations since we will use in-memory reference from within the TM instance
+			taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
+			taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
+		}
+
 		// register the TaskManager configuration
 		final File taskManagerConfigFile =
 			new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
@@ -630,7 +664,20 @@ public class MesosApplicationMasterRunner {
 			envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
 		}
 		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
-		envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername));
+
+		//add hadoop config directory to the environment
+		if(hadoopConf) {
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
+		}
+
+		//add keytab and principal to environment
+		if(keytabFileName != null && principal != null) {
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
+			envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
+		}
+
+		envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
+				UserGroupInformation.getCurrentUser().getUserName()));
 
 		cmd.setEnvironment(envBuilder);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 9413c68..bc6dde4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -30,7 +30,7 @@ public class MesosConfigKeys {
 	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
 	public static final String ENV_SLOTS = "_SLOTS";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
 	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
@@ -38,6 +38,9 @@ public class MesosConfigKeys {
 	public static final String ENV_CLASSPATH = "CLASSPATH";
 	public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
 	public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
+	public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+	public static final String ENV_KEYTAB = "_KEYTAB_FILE";
+	public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
 
 	/** Private constructor to prevent instantiation */
 	private MesosConfigKeys() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index ddc2097..d7544a0 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -33,8 +35,6 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +73,7 @@ public class MesosTaskManagerRunner {
 
 		// read the environment variables
 		final Map<String, String> envs = System.getenv();
-		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+		final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
 		final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
 		// configure local directory
@@ -87,34 +87,55 @@ public class MesosTaskManagerRunner {
 			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
 		}
 
-		LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
-			UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+		final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
+		LOG.info("Keytab file:{}", keytab);
 
-		// tell akka to die in case of an error
-		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+		final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
+		LOG.info("Keytab principal:{}", principal);
 
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername);
-		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens())
{
-			ugi.addToken(toks);
+		if(keytab != null && keytab.length() != 0) {
+			File f = new File(".", keytab);
+			if(!f.exists()) {
+				LOG.error("Could not locate keytab file:[" + keytab + "]");
+				System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			}
+			configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
+			configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
 		}
 
+		// tell akka to die in case of an error
+		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
 		// Infer the resource identifier from the environment variable
 		String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
+		String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
+		LOG.info("hadoopConfDir: {}", hadoopConfDir);
+
+		SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration();
+		sc.setFlinkConfiguration(configuration);
+		if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
+			sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+		}
+
+		try {
+			SecurityContext.install(sc);
+			LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
+					UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
+			SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Object>()
{
+				@Override
+				public Object run() throws Exception {
 					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
+					return null;
 				}
-				catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
-			}
-		});
+			});
+		}
+		catch (Throwable t) {
+			LOG.error("Error while starting the TaskManager", t);
+			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+		}
+
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/673c724e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 02892d3..4b7c731 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.security;
 
 import java.util.concurrent.Callable;
+			LOG.info("Hadoop security is enabled");
 
 /**
  * A security context with may be required to run a Callable.


Mime
View raw message