flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [28/31] flink git commit: [FLINK-8275] [security, yarn] Fix keytab local path in YarnTaskManagerRunner
Date Tue, 06 Feb 2018 19:02:52 GMT
[FLINK-8275] [security, yarn] Fix keytab local path in YarnTaskManagerRunner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bbf4e50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bbf4e50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bbf4e50

Branch: refs/heads/master
Commit: 2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8
Parents: a79916b
Author: Shuyi Chen <shuyi@uber.com>
Authored: Sun Dec 17 23:50:07 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Feb 6 19:58:59 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/security/SecurityUtils.java   |  4 +-
 .../runtime/security/modules/HadoopModule.java  |  6 ++
 .../yarn/TestingYarnTaskManagerRunner.java      |  3 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |  3 +-
 .../flink/yarn/YarnApplicationMasterRunner.java | 31 ++------
 .../flink/yarn/YarnTaskManagerRunner.java       | 64 +++++++++--------
 .../org/apache/flink/yarn/YarnTaskManager.scala |  3 +-
 .../flink/yarn/YarnTaskManagerRunnerTest.java   | 74 ++++++++++++++++++++
 flink-yarn/src/test/resources/flink-conf.yaml   | 23 ++++++
 flink-yarn/src/test/resources/krb5.keytab       |  0
 pom.xml                                         |  1 +
 11 files changed, 151 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index c73e966..cb68da4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.security;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.security.modules.SecurityModule;
 import org.apache.flink.runtime.security.modules.SecurityModuleFactory;
 
@@ -47,8 +46,7 @@ public class SecurityUtils {
 		return installedContext;
 	}
 
-	@VisibleForTesting
-	static List<SecurityModule> getInstalledModules() {
+	public static List<SecurityModule> getInstalledModules() {
 		return installedModules;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index a39d879..0738708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.security.modules;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.util.HadoopUtils;
 
@@ -58,6 +59,11 @@ public class HadoopModule implements SecurityModule {
 		this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
 	}
 
+	@VisibleForTesting
+	public SecurityConfiguration getSecurityConfig() {
+		return securityConfig;
+	}
+
 	@Override
 	public void install() throws SecurityInstallException {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
index 8586a77..90cdf9f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -25,6 +25,7 @@ import java.io.IOException;
  */
 public class TestingYarnTaskManagerRunner {
 	public static void main(String[] args) throws IOException {
-		YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
+		YarnTaskManagerRunner.runYarnTaskManager(
+				args, TestingYarnTaskManager.class, System.getenv(), null);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index f54528f..da95e01 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -72,7 +72,8 @@ class TestingYarnTaskManager(
       * @param args The command line arguments.
       */
     def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(args, classOf[TestingYarnTaskManager])
+      YarnTaskManagerRunner.runYarnTaskManager(
+        args, classOf[TestingYarnTaskManager], System.getenv(), null)
     }
 
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 68c0aec..1813034 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -146,19 +146,9 @@ public class YarnApplicationMasterRunner {
 			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
 			LOG.debug("Current working Directory: {}", currDir);
 
-			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
-			LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath);
-
 			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 			LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
-			String keytabPath = null;
-			if (remoteKeytabPath != null) {
-				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-				keytabPath = f.getAbsolutePath();
-				LOG.debug("keytabPath: {}", keytabPath);
-			}
-
 			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
@@ -171,9 +161,10 @@ public class YarnApplicationMasterRunner {
 
 			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties, LOG);
 
-			// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
-			if (keytabPath != null && remoteKeytabPrincipal != null) {
-				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+			if (remoteKeytabPrincipal != null && f.exists()) {
+				// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
+				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
 				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
@@ -253,19 +244,11 @@ public class YarnApplicationMasterRunner {
 
 			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			//Update keytab and principal path to reflect YARN container path location
-			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
-
 			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 
-			String keytabPath = null;
-			if (remoteKeytabPath != null) {
-				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-				keytabPath = f.getAbsolutePath();
-				LOG.info("keytabPath: {}", keytabPath);
-			}
-			if (keytabPath != null && remoteKeytabPrincipal != null) {
-				config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+			if (remoteKeytabPrincipal != null && f.exists()) {
+				config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
 				config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index a6e34c5..9d69fbd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -51,7 +51,29 @@ public class YarnTaskManagerRunner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
 
-	public static void runYarnTaskManager(String[] args, final Class<? extends YarnTaskManager>
taskManager) throws IOException {
+	private static Callable<Object> createMainRunner(
+			Configuration configuration,
+			ResourceID resourceId,
+			final Class<? extends YarnTaskManager> taskManager) {
+		return new Callable<Object>() {
+			@Override
+			public Integer call() {
+				try {
+					TaskManager.selectNetworkInterfaceAndRunTaskManager(
+							configuration, resourceId, taskManager);
+				} catch (Throwable t) {
+					LOG.error("Error while starting the TaskManager", t);
+					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+				}
+				return null;
+			}
+		};
+	}
+
+	public static void runYarnTaskManager(String[] args,
+																				final Class<? extends YarnTaskManager> taskManager,
+																				Map<String, String> envs,
+																				Callable<Object> mainRunner) throws IOException {
 		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
 		SignalHandler.register(LOG);
 		JvmShutdownSafeguard.installAsShutdownHook(LOG);
@@ -68,7 +90,6 @@ public class YarnTaskManagerRunner {
 		}
 
 		// read the environment variables for YARN
-		final Map<String, String> envs = System.getenv();
 		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
 		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
 		LOG.info("Current working/local Directory: {}", localDirs);
@@ -76,9 +97,6 @@ public class YarnTaskManagerRunner {
 		final String currDir = envs.get(Environment.PWD.key());
 		LOG.info("Current working Directory: {}", currDir);
 
-		final String remoteKeytabPath = envs.get(YarnConfigKeys.KEYTAB_PATH);
-		LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath);
-
 		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
@@ -95,13 +113,6 @@ public class YarnTaskManagerRunner {
 		// tell akka to die in case of an error
 		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
 
-		String localKeytabPath = null;
-		if (remoteKeytabPath != null) {
-			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-			localKeytabPath = f.getAbsolutePath();
-			LOG.info("localKeytabPath: {}", localKeytabPath);
-		}
-
 		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 
 		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
@@ -112,6 +123,12 @@ public class YarnTaskManagerRunner {
 		final ResourceID resourceId = new ResourceID(containerID);
 		LOG.info("ResourceID assigned for this container: {}", resourceId);
 
+		File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+		if (remoteKeytabPrincipal != null && f.exists()) {
+			// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+		}
 		try {
 
 			SecurityConfiguration sc;
@@ -125,12 +142,6 @@ public class YarnTaskManagerRunner {
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
 				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
"true");
 
-				// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
-				if (localKeytabPath != null && remoteKeytabPrincipal != null) {
-					configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath);
-					configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
-				}
-
 				sc = new SecurityConfiguration(configuration,
 					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
 
@@ -141,19 +152,10 @@ public class YarnTaskManagerRunner {
 
 			SecurityUtils.install(sc);
 
-			SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
-				@Override
-				public Integer call() {
-					try {
-						TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
-					}
-					catch (Throwable t) {
-						LOG.error("Error while starting the TaskManager", t);
-						System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-					}
-					return null;
-				}
-			});
+			if (mainRunner == null) {
+				mainRunner = createMainRunner(configuration, resourceId, taskManager);
+			}
+			SecurityUtils.getInstalledContext().runSecured(mainRunner);
 		} catch (Exception e) {
 			LOG.error("Exception occurred while launching Task Manager", e);
 			throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 615466d..5b8f4ca 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -62,7 +62,8 @@ class YarnTaskManager(
       * @param args The command line arguments.
       */
     def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])
+      YarnTaskManagerRunner.runYarnTaskManager(
+        args, classOf[YarnTaskManager], System.getenv(), null)
     }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
new file mode 100644
index 0000000..cdacfc0
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.	See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.	The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.	You may obtain a copy of the License at
+ *
+ *		 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+/**
+ * Tests for {@link YarnTaskManagerRunner}.
+ */
+public class YarnTaskManagerRunnerTest {
+
+	@Test
+	public void testKerberosKeytabConfiguration() throws IOException {
+		final String resourceDirPath =
+				Paths.get("src", "test", "resources").toAbsolutePath().toString();
+		final Map<String, String> envs = new HashMap<>();
+		envs.put(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID, "test_container_00001");
+		envs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, "testuser1@domain");
+		envs.put(ApplicationConstants.Environment.PWD.key(), resourceDirPath);
+		YarnTaskManagerRunner.runYarnTaskManager(
+				new String[]{"--configDir", resourceDirPath},
+				YarnTaskManager.class,
+				envs,
+				new Callable<Object>() {
+					@Override
+					public Integer call() {
+						final List<SecurityModule> modules = SecurityUtils.getInstalledModules();
+						Optional<SecurityModule> moduleOpt =
+								modules.stream().filter(s -> s instanceof HadoopModule).findFirst();
+						if (moduleOpt.isPresent()) {
+							HadoopModule hadoopModule = (HadoopModule) moduleOpt.get();
+							assertEquals("testuser1@domain", hadoopModule.getSecurityConfig().getPrincipal());
+							assertEquals(resourceDirPath + "/krb5.keytab",
+									hadoopModule.getSecurityConfig().getKeytab());
+						} else {
+							fail("Can not find HadoopModule!");
+						}
+						return null;
+					}
+				});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/resources/flink-conf.yaml b/flink-yarn/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..2c95194
--- /dev/null
+++ b/flink-yarn/src/test/resources/flink-conf.yaml
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#
+# This is a test configuration for validation of YarnTaskManagerRunner.
+#
+
+taskmanager.tmp.dirs: /tmp

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/flink-yarn/src/test/resources/krb5.keytab
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/resources/krb5.keytab b/flink-yarn/src/test/resources/krb5.keytab
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/flink/blob/2bbf4e50/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3065abf..83445dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1015,6 +1015,7 @@ under the License.
 						<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
 						<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
+						<exclude>flink-yarn/src/test/resources/krb5.keytab</exclude>
 						<exclude>test-infra/end-to-end-test/test-data/*</exclude>
 
 						<!-- snapshots -->


Mime
View raw message