flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [23/31] flink git commit: [FLINK-8275] [security, yarn] Remove test-specific code path in YarnTaskManagerRunner
Date Tue, 06 Feb 2018 19:02:47 GMT
[FLINK-8275] [security, yarn] Remove test-specific code path in YarnTaskManagerRunner

Previously, the YarnTaskManagerRunner contained a code path that exists
for the sole purpose of injecting mock runners. Having code paths just
to utilize tests in production code is in general a bad idea.

This commit fixes this be making YarnTaskManagerRunner a factory-like
class, which creates a Runner that contains all the runner’s properties,
such as configuration. Unit tests can than test against the contained
configuration in the created Runner to validate that everything is
configured properly.

This closes #5172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f0cac2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f0cac2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f0cac2

Branch: refs/heads/master
Commit: 97f0cac2af3a1140fa68090d94d83e009ad1e684
Parents: 2bbf4e5
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Fri Feb 2 15:53:13 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Feb 6 19:58:59 2018 +0100

----------------------------------------------------------------------
 .../yarn/TestingYarnTaskManagerRunner.java      |  12 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |  14 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   8 +-
 .../flink/yarn/YarnTaskManagerRunner.java       | 165 ----------------
 .../yarn/YarnTaskManagerRunnerFactory.java      | 186 +++++++++++++++++++
 .../org/apache/flink/yarn/YarnTaskManager.scala |  30 ++-
 .../yarn/YarnTaskManagerRunnerFactoryTest.java  |  76 ++++++++
 .../flink/yarn/YarnTaskManagerRunnerTest.java   |  74 --------
 8 files changed, 312 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
index 90cdf9f..ff030be 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.runtime.security.SecurityUtils;
+
 import java.io.IOException;
 
 /**
@@ -25,7 +27,13 @@ import java.io.IOException;
  */
 public class TestingYarnTaskManagerRunner {
 	public static void main(String[] args) throws IOException {
-		YarnTaskManagerRunner.runYarnTaskManager(
-				args, TestingYarnTaskManager.class, System.getenv(), null);
+		YarnTaskManagerRunnerFactory.Runner tmRunner = YarnTaskManagerRunnerFactory.create(
+			args, TestingYarnTaskManager.class, System.getenv());
+
+		try {
+			SecurityUtils.getInstalledContext().runSecured(tmRunner);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index da95e01..e33aa49 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
@@ -68,12 +69,19 @@ class TestingYarnTaskManager(
   object YarnTaskManager {
 
     /** Entry point (main method) to run the TaskManager on YARN.
- *
+      *
       * @param args The command line arguments.
       */
     def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(
-        args, classOf[TestingYarnTaskManager], System.getenv(), null)
+      val tmRunner = YarnTaskManagerRunnerFactory.create(
+        args, classOf[TestingYarnTaskManager], System.getenv())
+
+      try {
+        SecurityUtils.getInstalledContext.runSecured(tmRunner)
+      } catch {
+        case e: Exception =>
+          throw new RuntimeException(e)
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1813034..9d1af35 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -163,6 +163,9 @@ public class YarnApplicationMasterRunner {
 
 			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 			if (remoteKeytabPrincipal != null && f.exists()) {
+				String keytabPath = f.getAbsolutePath();
+				LOG.debug("keytabPath: {}", keytabPath);
+
 				// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
 				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
 				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
@@ -248,7 +251,10 @@ public class YarnApplicationMasterRunner {
 
 			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 			if (remoteKeytabPrincipal != null && f.exists()) {
-				config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
+				String keytabPath = f.getAbsolutePath();
+				LOG.debug("keytabPath: {}", keytabPath);
+
+				config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
 				config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index 9d69fbd..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.security.SecurityConfiguration;
-import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * The entry point for running a TaskManager in a YARN container.
- */
-public class YarnTaskManagerRunner {
-
-	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
-
-	private static Callable<Object> createMainRunner(
-			Configuration configuration,
-			ResourceID resourceId,
-			final Class<? extends YarnTaskManager> taskManager) {
-		return new Callable<Object>() {
-			@Override
-			public Integer call() {
-				try {
-					TaskManager.selectNetworkInterfaceAndRunTaskManager(
-							configuration, resourceId, taskManager);
-				} catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
-			}
-		};
-	}
-
-	public static void runYarnTaskManager(String[] args,
-																				final Class<? extends YarnTaskManager> taskManager,
-																				Map<String, String> envs,
-																				Callable<Object> mainRunner) throws IOException {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		// try to parse the command line arguments
-		final Configuration configuration;
-		try {
-			configuration = TaskManager.parseArgsAndLoadConfig(args);
-		}
-		catch (Throwable t) {
-			LOG.error(t.getMessage(), t);
-			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-			return;
-		}
-
-		// read the environment variables for YARN
-		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-		LOG.info("Current working/local Directory: {}", localDirs);
-
-		final String currDir = envs.get(Environment.PWD.key());
-		LOG.info("Current working Directory: {}", currDir);
-
-		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
-
-		// configure local directory
-		if (configuration.contains(CoreOptions.TMP_DIRS)) {
-			LOG.info("Overriding YARN's temporary file directories with those " +
-				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
-		}
-		else {
-			LOG.info("Setting directories for temporary files to: {}", localDirs);
-			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
-		}
-
-		// tell akka to die in case of an error
-		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
-
-		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-
-		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-				currentUser.getShortUserName(), yarnClientUsername);
-
-		// Infer the resource identifier from the environment variable
-		String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
-		final ResourceID resourceId = new ResourceID(containerID);
-		LOG.info("ResourceID assigned for this container: {}", resourceId);
-
-		File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-		if (remoteKeytabPrincipal != null && f.exists()) {
-			// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
-			configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
-			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
-		}
-		try {
-
-			SecurityConfiguration sc;
-
-			//To support Yarn Secure Integration Test Scenario
-			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if (krb5Conf.exists() && krb5Conf.canRead()) {
-				String krb5Path = krb5Conf.getAbsolutePath();
-				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
"true");
-
-				sc = new SecurityConfiguration(configuration,
-					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
-
-			} else {
-				sc = new SecurityConfiguration(configuration);
-
-			}
-
-			SecurityUtils.install(sc);
-
-			if (mainRunner == null) {
-				mainRunner = createMainRunner(configuration, resourceId, taskManager);
-			}
-			SecurityUtils.getInstalledContext().runSecured(mainRunner);
-		} catch (Exception e) {
-			LOG.error("Exception occurred while launching Task Manager", e);
-			throw new RuntimeException(e);
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
new file mode 100644
index 0000000..d14248c
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a YARN container.
+ */
+public class YarnTaskManagerRunnerFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunnerFactory.class);
+
+	/**
+	 * Runner for the {@link YarnTaskManager}.
+	 */
+	public static class Runner implements Callable<Object> {
+
+		private final Configuration configuration;
+		private final ResourceID resourceId;
+		private final Class<? extends YarnTaskManager> taskManager;
+
+		Runner(Configuration configuration, ResourceID resourceId, Class<? extends YarnTaskManager>
taskManager) {
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.resourceId = Preconditions.checkNotNull(resourceId);
+			this.taskManager = Preconditions.checkNotNull(taskManager);
+		}
+
+		@Override
+		public Object call() throws Exception {
+			try {
+				TaskManager.selectNetworkInterfaceAndRunTaskManager(
+					configuration, resourceId, taskManager);
+			} catch (Throwable t) {
+				LOG.error("Error while starting the TaskManager", t);
+				System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			}
+			return null;
+		}
+
+		@VisibleForTesting
+		Configuration getConfiguration() {
+			return configuration;
+		}
+
+		@VisibleForTesting
+		ResourceID getResourceId() {
+			return resourceId;
+		}
+	}
+
+	/**
+	 * Creates a {@link YarnTaskManagerRunnerFactory.Runner}.
+	 */
+	public static Runner create(
+			String[] args,
+			final Class<? extends YarnTaskManager> taskManager,
+			Map<String, String> envs) throws IOException {
+
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
+		SignalHandler.register(LOG);
+		JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+		// try to parse the command line arguments
+		final Configuration configuration;
+		try {
+			configuration = TaskManager.parseArgsAndLoadConfig(args);
+		}
+		catch (Throwable t) {
+			LOG.error(t.getMessage(), t);
+			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			return null;
+		}
+
+		// read the environment variables for YARN
+		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+		LOG.info("Current working/local Directory: {}", localDirs);
+
+		final String currDir = envs.get(Environment.PWD.key());
+		LOG.info("Current working Directory: {}", currDir);
+
+		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
+
+		// configure local directory
+		if (configuration.contains(CoreOptions.TMP_DIRS)) {
+			LOG.info("Overriding YARN's temporary file directories with those " +
+				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
+		}
+		else {
+			LOG.info("Setting directories for temporary files to: {}", localDirs);
+			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
+		}
+
+		// tell akka to die in case of an error
+		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+
+		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+				currentUser.getShortUserName(), yarnClientUsername);
+
+		// Infer the resource identifier from the environment variable
+		String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
+		final ResourceID resourceId = new ResourceID(containerID);
+		LOG.info("ResourceID assigned for this container: {}", resourceId);
+
+		File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+		if (remoteKeytabPrincipal != null && f.exists()) {
+			// set keytab principal and replace path with the local path of the shipped keytab file
in NodeManager
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		try {
+			SecurityConfiguration sc;
+
+			//To support Yarn Secure Integration Test Scenario
+			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+			if (krb5Conf.exists() && krb5Conf.canRead()) {
+				String krb5Path = krb5Conf.getAbsolutePath();
+				LOG.info("KRB5 Conf: {}", krb5Path);
+				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
+				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
"true");
+
+				sc = new SecurityConfiguration(configuration,
+					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
+
+			} else {
+				sc = new SecurityConfiguration(configuration);
+
+			}
+
+			SecurityUtils.install(sc);
+
+			return new Runner(configuration, resourceId, taskManager);
+		} catch (Exception e) {
+			LOG.error("Exception occurred while building Task Manager runner", e);
+			throw new RuntimeException(e);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 5b8f4ca..08d24b4 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -24,9 +24,12 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
+import grizzled.slf4j.Logger
+
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.
   */
@@ -56,14 +59,25 @@ class YarnTaskManager(
   }
 }
 
-  object YarnTaskManager {
-    /** Entry point (main method) to run the TaskManager on YARN.
-      *
-      * @param args The command line arguments.
-      */
-    def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(
-        args, classOf[YarnTaskManager], System.getenv(), null)
+object YarnTaskManager {
+
+  val LOG = Logger(classOf[TaskManager])
+
+  /** Entry point (main method) to run the TaskManager on YARN.
+    *
+    * @param args The command line arguments.
+    */
+  def main(args: Array[String]): Unit = {
+    val tmRunner = YarnTaskManagerRunnerFactory.create(
+      args, classOf[YarnTaskManager], System.getenv())
+
+    try {
+      SecurityUtils.getInstalledContext.runSecured(tmRunner)
+    } catch {
+      case e: Exception =>
+        LOG.error("Exception occurred while launching Task Manager runner", e)
+        throw new RuntimeException(e)
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactoryTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactoryTest.java
new file mode 100644
index 0000000..7f518f1
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.	See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.	The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.	You may obtain a copy of the License at
+ *
+ *		 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Tests for {@link YarnTaskManagerRunnerFactory}.
+ */
+public class YarnTaskManagerRunnerFactoryTest {
+
+	@Test
+	public void testKerberosKeytabConfiguration() throws IOException {
+		final String resourceDirPath =
+				Paths.get("src", "test", "resources").toAbsolutePath().toString();
+		final Map<String, String> envs = new HashMap<>();
+		envs.put(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID, "test_container_00001");
+		envs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, "testuser1@domain");
+		envs.put(ApplicationConstants.Environment.PWD.key(), resourceDirPath);
+
+		final YarnTaskManagerRunnerFactory.Runner tmRunner = YarnTaskManagerRunnerFactory.create(
+			new String[]{"--configDir", resourceDirPath},
+			YarnTaskManager.class,
+			envs);
+
+		final List<SecurityModule> modules = SecurityUtils.getInstalledModules();
+		Optional<SecurityModule> moduleOpt =
+			modules.stream().filter(s -> s instanceof HadoopModule).findFirst();
+		if (moduleOpt.isPresent()) {
+			HadoopModule hadoopModule = (HadoopModule) moduleOpt.get();
+			assertEquals("testuser1@domain", hadoopModule.getSecurityConfig().getPrincipal());
+			assertEquals(resourceDirPath + "/" + Utils.KEYTAB_FILE_NAME,
+				hadoopModule.getSecurityConfig().getKeytab());
+		} else {
+			fail("Can not find HadoopModule!");
+		}
+
+		final Configuration configuration = tmRunner.getConfiguration();
+		assertEquals(resourceDirPath + "/" + Utils.KEYTAB_FILE_NAME, configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
+		assertEquals("testuser1@domain", configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
+		assertEquals("test_container_00001", tmRunner.getResourceId().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f0cac2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
deleted file mode 100644
index cdacfc0..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskManagerRunnerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.	See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.	The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.	You may obtain a copy of the License at
- *
- *		 http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.runtime.security.modules.SecurityModule;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-
-/**
- * Tests for {@link YarnTaskManagerRunner}.
- */
-public class YarnTaskManagerRunnerTest {
-
-	@Test
-	public void testKerberosKeytabConfiguration() throws IOException {
-		final String resourceDirPath =
-				Paths.get("src", "test", "resources").toAbsolutePath().toString();
-		final Map<String, String> envs = new HashMap<>();
-		envs.put(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID, "test_container_00001");
-		envs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, "testuser1@domain");
-		envs.put(ApplicationConstants.Environment.PWD.key(), resourceDirPath);
-		YarnTaskManagerRunner.runYarnTaskManager(
-				new String[]{"--configDir", resourceDirPath},
-				YarnTaskManager.class,
-				envs,
-				new Callable<Object>() {
-					@Override
-					public Integer call() {
-						final List<SecurityModule> modules = SecurityUtils.getInstalledModules();
-						Optional<SecurityModule> moduleOpt =
-								modules.stream().filter(s -> s instanceof HadoopModule).findFirst();
-						if (moduleOpt.isPresent()) {
-							HadoopModule hadoopModule = (HadoopModule) moduleOpt.get();
-							assertEquals("testuser1@domain", hadoopModule.getSecurityConfig().getPrincipal());
-							assertEquals(resourceDirPath + "/krb5.keytab",
-									hadoopModule.getSecurityConfig().getKeytab());
-						} else {
-							fail("Can not find HadoopModule!");
-						}
-						return null;
-					}
-				});
-	}
-}


Mime
View raw message