Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06C91200C1A for ; Mon, 13 Feb 2017 20:53:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 05662160B4A; Mon, 13 Feb 2017 19:53:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 112E9160B60 for ; Mon, 13 Feb 2017 20:53:49 +0100 (CET) Received: (qmail 89709 invoked by uid 500); 13 Feb 2017 19:53:49 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89696 invoked by uid 99); 13 Feb 2017 19:53:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2017 19:53:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1894DDFC31; Mon, 13 Feb 2017 19:53:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 13 Feb 2017 19:53:49 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths. archived-at: Mon, 13 Feb 2017 19:53:52 -0000 Repository: flink Updated Branches: refs/heads/master 6bc6b225e -> 186b12309 [FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths. This closes #3202 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/186b1230 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/186b1230 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/186b1230 Branch: refs/heads/master Commit: 186b12309b540f82a055be28f3f005dce4b8cf46 Parents: 30c5b77 Author: Haohui Mai Authored: Tue Jan 31 12:11:01 2017 -0800 Committer: Stephan Ewen Committed: Mon Feb 13 20:51:50 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/yarn/Utils.java | 223 +++++++++++++- .../flink/yarn/YarnApplicationMasterRunner.java | 236 +-------------- .../apache/flink/yarn/YarnResourceManager.java | 211 +------------ .../java/org/apache/flink/yarn/UtilsTest.java | 298 +++++++++++++++++++ .../yarn/YarnApplicationMasterRunnerTest.java | 93 ++++++ .../yarn/YarnFlinkResourceManagerTest.java | 298 ------------------- 6 files changed, 617 insertions(+), 742 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 94d4582..60f7204 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -23,11 +23,16 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; + /** * Utility class that provides helper methods to work with Apache Hadoop YARN. */ @@ -107,7 +114,7 @@ public final class Utils { addToEnvironment( appMasterEnv, Environment.CLASSPATH.name(), - appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH)); + appMasterEnv.get(ENV_FLINK_CLASSPATH)); String[] applicationClassPathEntries = conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); @@ -264,4 +271,218 @@ public final class Utils { } return result; } + + /** + * Creates the launch context, which describes how to bring up a TaskExecutor / TaskManager process in + * an allocated YARN container. + * + *

This code is extremely YARN specific and registers all the resources that the TaskExecutor + * needs (such as JAR file, config file, ...) and all environment variables in a YARN + * container launch context. The launch context then ensures that those resources will be + * copied into the containers transient working directory. + * + * @param flinkConfig + * The Flink configuration object. + * @param yarnConfig + * The YARN configuration object. + * @param env + * The environment variables. + * @param tmParams + * The TaskExecutor container memory parameters. + * @param taskManagerConfig + * The configuration for the TaskExecutors. + * @param workingDirectory + * The current application master container's working directory. + * @param taskManagerMainClass + * The class with the main method. + * @param log + * The logger. + * + * @return The launch context for the TaskManager processes. + * + * @throws Exception Thrown if teh launch context could not be created, for example if + * the resources could not be copied. + */ + static ContainerLaunchContext createTaskExecutorContext( + org.apache.flink.configuration.Configuration flinkConfig, + YarnConfiguration yarnConfig, + Map env, + ContaineredTaskManagerParameters tmParams, + org.apache.flink.configuration.Configuration taskManagerConfig, + String workingDirectory, + Class taskManagerMainClass, + Logger log) throws Exception { + + // get and validate all relevant variables + + String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH); + require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH); + + String appId = env.get(YarnConfigKeys.ENV_APP_ID); + require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID); + + String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR); + require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR); + + String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES); + require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES); + + String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); + + final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); + log.info("TM:remote keytab path obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); + + final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); + log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); + + final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); + log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + String classPathString = env.get(ENV_FLINK_CLASSPATH); + require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH); + + //register keytab + LocalResource keytabResource = null; + if(remoteKeytabPath != null) { + log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); + keytabResource = Records.newRecord(LocalResource.class); + Path keytabPath = new Path(remoteKeytabPath); + FileSystem fs = keytabPath.getFileSystem(yarnConfig); + registerLocalResource(fs, keytabPath, keytabResource); + } + + //To support Yarn Secure Integration Test Scenario + LocalResource yarnConfResource = null; + LocalResource krb5ConfResource = null; + boolean hasKrb5 = false; + if(remoteYarnConfPath != null && remoteKrb5Path != null) { + log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); + yarnConfResource = Records.newRecord(LocalResource.class); + Path yarnConfPath = new Path(remoteYarnConfPath); + FileSystem fs = yarnConfPath.getFileSystem(yarnConfig); + registerLocalResource(fs, yarnConfPath, yarnConfResource); + + log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); + krb5ConfResource = Records.newRecord(LocalResource.class); + Path krb5ConfPath = new Path(remoteKrb5Path); + fs = krb5ConfPath.getFileSystem(yarnConfig); + registerLocalResource(fs, krb5ConfPath, krb5ConfResource); + + hasKrb5 = true; + } + + // register Flink Jar with remote HDFS + LocalResource flinkJar = Records.newRecord(LocalResource.class); + { + Path remoteJarPath = new Path(remoteFlinkJarPath); + FileSystem fs = remoteJarPath.getFileSystem(yarnConfig); + registerLocalResource(fs, remoteJarPath, flinkJar); + } + + // register conf with local fs + LocalResource flinkConf = Records.newRecord(LocalResource.class); + { + // write the TaskManager configuration to a local file + final File taskManagerConfigFile = + new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); + log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); + BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); + + Path homeDirPath = new Path(clientHomeDir); + FileSystem fs = homeDirPath.getFileSystem(yarnConfig); + setupLocalResource(fs, appId, + new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); + + log.info("Prepared local resource for modified yaml: {}", flinkConf); + } + + Map taskManagerLocalResources = new HashMap<>(); + taskManagerLocalResources.put("flink.jar", flinkJar); + taskManagerLocalResources.put("flink-conf.yaml", flinkConf); + + //To support Yarn Secure Integration Test Scenario + if(yarnConfResource != null && krb5ConfResource != null) { + taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource); + taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource); + } + + if(keytabResource != null) { + taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource); + } + + // prepare additional files to be shipped + for (String pathStr : shipListString.split(",")) { + if (!pathStr.isEmpty()) { + LocalResource resource = Records.newRecord(LocalResource.class); + Path path = new Path(pathStr); + registerLocalResource(path.getFileSystem(yarnConfig), path, resource); + taskManagerLocalResources.put(path.getName(), resource); + } + } + + // now that all resources are prepared, we can create the launch context + + log.info("Creating container launch context for TaskManagers"); + + boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); + boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); + + String launchCommand = BootstrapTools.getTaskManagerShellCommand( + flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR, + hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); + + log.info("Starting TaskManagers with command: " + launchCommand); + + ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); + ctx.setCommands(Collections.singletonList(launchCommand)); + ctx.setLocalResources(taskManagerLocalResources); + + Map containerEnv = new HashMap<>(); + containerEnv.putAll(tmParams.taskManagerEnv()); + + // add YARN classpath, etc to the container environment + containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); + setupYarnClassPath(yarnConfig, containerEnv); + + containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); + + if(remoteKeytabPath != null && remoteKeytabPrincipal != null) { + containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath); + containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal); + } + + ctx.setEnvironment(containerEnv); + + try (DataOutputBuffer dob = new DataOutputBuffer()) { + log.debug("Adding security tokens to Task Executor Container launch Context...."); + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + Credentials credentials = user.getCredentials(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ctx.setTokens(securityTokens); + } + catch (Throwable t) { + log.error("Getting current user info failed when trying to launch the container", t); + } + + return ctx; + } + + /** + * Validates a condition, throwing a RuntimeException if the condition is violated. + * + * @param condition The condition. + * @param message The message for the runtime exception, with format variables as defined by + * {@link String#format(String, Object...)}. + * @param values The format arguments. + */ + static void require(boolean condition, String message, Object... values) { + if (!condition) { + throw new RuntimeException(String.format(message, values)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 5cc51e4..9d5673c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -47,16 +47,10 @@ import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,19 +60,14 @@ import scala.Some; import scala.concurrent.duration.FiniteDuration; import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Map; -import java.util.HashMap; -import java.util.UUID; -import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.Utils.require; /** * This class is the executable entry point for the YARN application master. @@ -329,7 +318,7 @@ public class YarnApplicationMasterRunner { config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); LOG.debug("TaskManager configuration: {}", taskManagerConfig); - final ContainerLaunchContext taskManagerContext = createTaskManagerContext( + final ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext( config, yarnConfig, ENV, taskManagerParameters, taskManagerConfig, currDir, getTaskManagerClass(), LOG); @@ -483,20 +472,6 @@ public class YarnApplicationMasterRunner { // ------------------------------------------------------------------------ /** - * Validates a condition, throwing a RuntimeException if the condition is violated. - * - * @param condition The condition. - * @param message The message for the runtime exception, with format variables as defined by - * {@link String#format(String, Object...)}. - * @param values The format arguments. - */ - private static void require(boolean condition, String message, Object... values) { - if (!condition) { - throw new RuntimeException(String.format(message, values)); - } - } - - /** * * @param baseDirectory * @param additional @@ -549,211 +524,4 @@ public class YarnApplicationMasterRunner { return configuration; } - - /** - * Creates the launch context, which describes how to bring up a TaskManager process in - * an allocated YARN container. - * - *

This code is extremely YARN specific and registers all the resources that the TaskManager - * needs (such as JAR file, config file, ...) and all environment variables in a YARN - * container launch context. The launch context then ensures that those resources will be - * copied into the containers transient working directory. - * - *

We do this work before we start the ResourceManager actor in order to fail early if - * any of the operations here fail. - * - * @param flinkConfig - * The Flink configuration object. - * @param yarnConfig - * The YARN configuration object. - * @param env - * The environment variables. - * @param tmParams - * The TaskManager container memory parameters. - * @param taskManagerConfig - * The configuration for the TaskManagers. - * @param workingDirectory - * The current application master container's working directory. - * @param taskManagerMainClass - * The class with the main method. - * @param log - * The logger. - * - * @return The launch context for the TaskManager processes. - * - * @throws Exception Thrown if teh launch context could not be created, for example if - * the resources could not be copied. - */ - public static ContainerLaunchContext createTaskManagerContext( - Configuration flinkConfig, - YarnConfiguration yarnConfig, - Map env, - ContaineredTaskManagerParameters tmParams, - Configuration taskManagerConfig, - String workingDirectory, - Class taskManagerMainClass, - Logger log) throws Exception { - - log.info("Setting up resources for TaskManagers"); - - // get and validate all relevant variables - - String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH); - require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH); - - String appId = env.get(YarnConfigKeys.ENV_APP_ID); - require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID); - - String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR); - require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR); - - String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES); - require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES); - - String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); - - final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath); - - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); - - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath); - - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path); - - String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH); - require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH); - - // obtain a handle to the file system used by YARN - final org.apache.hadoop.fs.FileSystem yarnFileSystem; - try { - yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig); - } catch (IOException e) { - throw new Exception("Could not access YARN's default file system", e); - } - - //register keytab - LocalResource keytabResource = null; - if(remoteKeytabPath != null) { - LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); - keytabResource = Records.newRecord(LocalResource.class); - Path keytabPath = new Path(remoteKeytabPath); - Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource); - } - - //To support Yarn Secure Integration Test Scenario - LocalResource yarnConfResource = null; - LocalResource krb5ConfResource = null; - boolean hasKrb5 = false; - if(remoteYarnConfPath != null && remoteKrb5Path != null) { - LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); - yarnConfResource = Records.newRecord(LocalResource.class); - Path yarnConfPath = new Path(remoteYarnConfPath); - Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource); - - LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); - krb5ConfResource = Records.newRecord(LocalResource.class); - Path krb5ConfPath = new Path(remoteKrb5Path); - Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource); - - hasKrb5 = true; - } - - // register Flink Jar with remote HDFS - LocalResource flinkJar = Records.newRecord(LocalResource.class); - { - Path remoteJarPath = new Path(remoteFlinkJarPath); - Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar); - } - - // register conf with local fs - LocalResource flinkConf = Records.newRecord(LocalResource.class); - { - // write the TaskManager configuration to a local file - final File taskManagerConfigFile = - new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); - LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); - BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); - - Utils.setupLocalResource(yarnFileSystem, appId, - new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); - - log.info("Prepared local resource for modified yaml: {}", flinkConf); - } - - Map taskManagerLocalResources = new HashMap<>(); - taskManagerLocalResources.put("flink.jar", flinkJar); - taskManagerLocalResources.put("flink-conf.yaml", flinkConf); - - //To support Yarn Secure Integration Test Scenario - if(yarnConfResource != null && krb5ConfResource != null) { - taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); - taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); - } - - if(keytabResource != null) { - taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); - } - - // prepare additional files to be shipped - for (String pathStr : shipListString.split(",")) { - if (!pathStr.isEmpty()) { - LocalResource resource = Records.newRecord(LocalResource.class); - Path path = new Path(pathStr); - Utils.registerLocalResource(yarnFileSystem, path, resource); - taskManagerLocalResources.put(path.getName(), resource); - } - } - - // now that all resources are prepared, we can create the launch context - - log.info("Creating container launch context for TaskManagers"); - - boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); - boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); - - String launchCommand = BootstrapTools.getTaskManagerShellCommand( - flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR, - hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); - - log.info("Starting TaskManagers with command: " + launchCommand); - - ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); - ctx.setCommands(Collections.singletonList(launchCommand)); - ctx.setLocalResources(taskManagerLocalResources); - - Map containerEnv = new HashMap<>(); - containerEnv.putAll(tmParams.taskManagerEnv()); - - // add YARN classpath, etc to the container environment - containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); - Utils.setupYarnClassPath(yarnConfig, containerEnv); - - containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - - if(remoteKeytabPath != null && remoteKeytabPrincipal != null) { - containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath); - containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal); - } - - ctx.setEnvironment(containerEnv); - - try (DataOutputBuffer dob = new DataOutputBuffer()) { - LOG.debug("Adding security tokens to Task Manager Container launch Context...."); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - Credentials credentials = user.getCredentials(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - ctx.setTokens(securityTokens); - } - catch (Throwable t) { - log.error("Getting current user info failed when trying to launch the container", t); - } - - return ctx; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 9b9ea39..ab96441 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -35,10 +35,6 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -47,29 +43,20 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; import org.apache.flink.util.ExceptionUtils; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Map; import java.util.HashMap; import java.util.List; -import java.util.Collections; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; - /** * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. @@ -357,7 +344,7 @@ public class YarnResourceManager extends ResourceManager implements flinkConfig, "", 0, 1, teRegistrationTimeout); LOG.debug("TaskManager configuration: {}", taskManagerConfig); - ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext( + ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( flinkConfig, yarnConfig, ENV, taskManagerParameters, taskManagerConfig, currDir, YarnTaskExecutorRunner.class, LOG); @@ -371,204 +358,10 @@ public class YarnResourceManager extends ResourceManager implements } - /** - * Creates the launch context, which describes how to bring up a TaskExecutor process in - * an allocated YARN container. - * - *

This code is extremely YARN specific and registers all the resources that the TaskExecutor - * needs (such as JAR file, config file, ...) and all environment variables in a YARN - * container launch context. The launch context then ensures that those resources will be - * copied into the containers transient working directory. - * - * @param flinkConfig - * The Flink configuration object. - * @param yarnConfig - * The YARN configuration object. - * @param env - * The environment variables. - * @param tmParams - * The TaskExecutor container memory parameters. - * @param taskManagerConfig - * The configuration for the TaskExecutors. - * @param workingDirectory - * The current application master container's working directory. - * @param taskManagerMainClass - * The class with the main method. - * @param log - * The logger. - * - * @return The launch context for the TaskManager processes. - * - * @throws Exception Thrown if teh launch context could not be created, for example if - * the resources could not be copied. - */ - private static ContainerLaunchContext createTaskExecutorContext( - Configuration flinkConfig, - YarnConfiguration yarnConfig, - Map env, - ContaineredTaskManagerParameters tmParams, - Configuration taskManagerConfig, - String workingDirectory, - Class taskManagerMainClass, - Logger log) throws Exception { - - // get and validate all relevant variables - - String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH); - - String appId = env.get(YarnConfigKeys.ENV_APP_ID); - - String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR); - - String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES); - - String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - - final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - log.info("TM:remote keytab path obtained {}", remoteKeytabPath); - - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); - - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); - - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); - - String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH); - - // obtain a handle to the file system used by YARN - final org.apache.hadoop.fs.FileSystem yarnFileSystem; - try { - yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig); - } catch (IOException e) { - throw new Exception("Could not access YARN's default file system", e); - } - - //register keytab - LocalResource keytabResource = null; - if(remoteKeytabPath != null) { - log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); - keytabResource = Records.newRecord(LocalResource.class); - Path keytabPath = new Path(remoteKeytabPath); - Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource); - } - - //To support Yarn Secure Integration Test Scenario - LocalResource yarnConfResource = null; - LocalResource krb5ConfResource = null; - boolean hasKrb5 = false; - if(remoteYarnConfPath != null && remoteKrb5Path != null) { - log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); - yarnConfResource = Records.newRecord(LocalResource.class); - Path yarnConfPath = new Path(remoteYarnConfPath); - Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource); - - log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); - krb5ConfResource = Records.newRecord(LocalResource.class); - Path krb5ConfPath = new Path(remoteKrb5Path); - Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource); - - hasKrb5 = true; - } - - // register Flink Jar with remote HDFS - LocalResource flinkJar = Records.newRecord(LocalResource.class); - { - Path remoteJarPath = new Path(remoteFlinkJarPath); - Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar); - } - - // register conf with local fs - LocalResource flinkConf = Records.newRecord(LocalResource.class); - { - // write the TaskManager configuration to a local file - final File taskManagerConfigFile = - new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); - log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); - BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); - - Utils.setupLocalResource(yarnFileSystem, appId, - new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); - log.info("Prepared local resource for modified yaml: {}", flinkConf); - } - - Map taskManagerLocalResources = new HashMap<>(); - taskManagerLocalResources.put("flink.jar", flinkJar); - taskManagerLocalResources.put("flink-conf.yaml", flinkConf); - - //To support Yarn Secure Integration Test Scenario - if(yarnConfResource != null && krb5ConfResource != null) { - taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); - taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); - } - - if(keytabResource != null) { - taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); - } - - // prepare additional files to be shipped - for (String pathStr : shipListString.split(",")) { - if (!pathStr.isEmpty()) { - LocalResource resource = Records.newRecord(LocalResource.class); - Path path = new Path(pathStr); - Utils.registerLocalResource(yarnFileSystem, path, resource); - taskManagerLocalResources.put(path.getName(), resource); - } - } - - // now that all resources are prepared, we can create the launch context - - log.info("Creating container launch context for TaskManagers"); - - boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); - boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); - - String launchCommand = BootstrapTools.getTaskManagerShellCommand( - flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR, - hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); - - log.info("Starting TaskManagers with command: " + launchCommand); - - ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); - ctx.setCommands(Collections.singletonList(launchCommand)); - ctx.setLocalResources(taskManagerLocalResources); - - Map containerEnv = new HashMap<>(); - containerEnv.putAll(tmParams.taskManagerEnv()); - - // add YARN classpath, etc to the container environment - containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); - Utils.setupYarnClassPath(yarnConfig, containerEnv); - - containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - - if(remoteKeytabPath != null && remoteKeytabPrincipal != null) { - containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath); - containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal); - } - - ctx.setEnvironment(containerEnv); - - try (DataOutputBuffer dob = new DataOutputBuffer()) { - log.debug("Adding security tokens to Task Executor Container launch Context...."); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - Credentials credentials = user.getCredentials(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - ctx.setTokens(securityTokens); - } - catch (Throwable t) { - log.error("Getting current user info failed when trying to launch the container", t); - } - - return ctx; - } /** - * Generate priority by given resouce profile. + * Generate priority by given resource profile. * Priority is only used for distinguishing request of different resource. * @param resourceProfile The resource profile of a request * @return The priority of this resource profile. http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java new file mode 100644 index 0000000..8534ba8 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; +import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class UtilsTest extends TestLogger { + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception { + new JavaTestKit(system) {{ + + final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); + + Configuration flinkConfig = new Configuration(); + YarnConfiguration yarnConfig = new YarnConfiguration(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + String applicationMasterHostName = "localhost"; + String webInterfaceURL = "foobar"; + ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( + 1l, 1l, 1l, 1, new HashMap()); + ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); + int yarnHeartbeatIntervalMillis = 1000; + int maxFailedContainers = 10; + int numInitialTaskManagers = 5; + final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler(); + AMRMClientAsync resourceManagerClient = mock(AMRMClientAsync.class); + NMClient nodeManagerClient = mock(NMClient.class); + UUID leaderSessionID = UUID.randomUUID(); + + final List containerList = new ArrayList<>(); + + for (int i = 0; i < numInitialTaskManagers; i++) { + containerList.add(new TestingContainer("container_" + i, "localhost")); + } + + doAnswer(new Answer() { + int counter = 0; + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (counter < containerList.size()) { + callbackHandler.onContainersAllocated( + Collections.singletonList( + containerList.get(counter++) + )); + } + return null; + } + }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); + + ActorRef resourceManager = null; + ActorRef leader1; + + try { + leader1 = system.actorOf( + Props.create( + TestingUtils.ForwardingActor.class, + getRef(), + Option.apply(leaderSessionID) + )); + + resourceManager = system.actorOf( + Props.create( + TestingYarnFlinkResourceManager.class, + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers, + callbackHandler, + resourceManagerClient, + nodeManagerClient + )); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); + final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Container container = (Container) invocation.getArguments()[0]; + resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leader1Gateway); + return null; + } + }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class)); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft()); + + Await.ready(taskManagerRegisteredFuture, deadline.timeLeft()); + + leaderRetrievalService.notifyListener(null, null); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); + + for (Container container: containerList) { + resourceManagerGateway.tell( + new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leader1Gateway); + } + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft()); + + int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + + assertEquals(numInitialTaskManagers, numberOfRegisteredResources); + } finally { + if (resourceManager != null) { + resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + }}; + } + + static class TestingContainer extends Container { + + private final String id; + private final String host; + + TestingContainer(String id, String host) { + this.id = id; + this.host = host; + } + + @Override + public ContainerId getId() { + ContainerId containerId = mock(ContainerId.class); + when(containerId.toString()).thenReturn(id); + + return containerId; + } + + @Override + public void setId(ContainerId containerId) { + + } + + @Override + public NodeId getNodeId() { + NodeId nodeId = mock(NodeId.class); + when(nodeId.getHost()).thenReturn(host); + + return nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + + } + + @Override + public String getNodeHttpAddress() { + return null; + } + + @Override + public void setNodeHttpAddress(String s) { + + } + + @Override + public Resource getResource() { + return null; + } + + @Override + public void setResource(Resource resource) { + + } + + @Override + public Priority getPriority() { + return null; + } + + @Override + public void setPriority(Priority priority) { + + } + + @Override + public Token getContainerToken() { + return null; + } + + @Override + public void setContainerToken(Token token) { + + } + + @Override + public int compareTo(Container o) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java new file mode 100644 index 0000000..f874896 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import com.google.common.collect.ImmutableMap; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Map; + +import static org.apache.flink.yarn.YarnConfigKeys.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class YarnApplicationMasterRunnerTest { + private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testCreateTaskExecutorContext() throws Exception { + File root = folder.getRoot(); + File home = new File(root, "home"); + boolean created = home.mkdir(); + assertTrue(created); + + Answer getDefault = new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return invocationOnMock.getArguments()[1]; + } + }; + Configuration flinkConf = new Configuration(); + YarnConfiguration yarnConf = mock(YarnConfiguration.class); + doAnswer(getDefault).when(yarnConf).get(anyString(), anyString()); + doAnswer(getDefault).when(yarnConf).getInt(anyString(), anyInt()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return new String[] {(String) invocationOnMock.getArguments()[1]}; + } + }).when(yarnConf).getStrings(anyString(), Mockito. anyVararg()); + + Map env = ImmutableMap. builder() + .put(ENV_APP_ID, "foo") + .put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()) + .put(ENV_CLIENT_SHIP_FILES, "") + .put(ENV_FLINK_CLASSPATH, "") + .put(ENV_HADOOP_USER_NAME, "foo") + .put(FLINK_JAR_PATH, root.toURI().toString()) + .build(); + ContaineredTaskManagerParameters tmParams = mock(ContaineredTaskManagerParameters.class); + Configuration taskManagerConf = new Configuration(); + + String workingDirectory = root.getAbsolutePath(); + Class taskManagerMainClass = YarnApplicationMasterRunnerTest.class; + ContainerLaunchContext ctx = Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams, + taskManagerConf, workingDirectory, taskManagerMainClass, LOG); + assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java deleted file mode 100644 index a3ff6c4..0000000 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; -import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; -import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; -import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; -import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class YarnFlinkResourceManagerTest extends TestLogger { - - private static ActorSystem system; - - @BeforeClass - public static void setup() { - system = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - - @Test - public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception { - new JavaTestKit(system) {{ - - final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); - - Configuration flinkConfig = new Configuration(); - YarnConfiguration yarnConfig = new YarnConfiguration(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); - String applicationMasterHostName = "localhost"; - String webInterfaceURL = "foobar"; - ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( - 1l, 1l, 1l, 1, new HashMap()); - ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); - int yarnHeartbeatIntervalMillis = 1000; - int maxFailedContainers = 10; - int numInitialTaskManagers = 5; - final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler(); - AMRMClientAsync resourceManagerClient = mock(AMRMClientAsync.class); - NMClient nodeManagerClient = mock(NMClient.class); - UUID leaderSessionID = UUID.randomUUID(); - - final List containerList = new ArrayList<>(); - - for (int i = 0; i < numInitialTaskManagers; i++) { - containerList.add(new TestingContainer("container_" + i, "localhost")); - } - - doAnswer(new Answer() { - int counter = 0; - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - if (counter < containerList.size()) { - callbackHandler.onContainersAllocated( - Collections.singletonList( - containerList.get(counter++) - )); - } - return null; - } - }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); - - ActorRef resourceManager = null; - ActorRef leader1; - - try { - leader1 = system.actorOf( - Props.create( - TestingUtils.ForwardingActor.class, - getRef(), - Option.apply(leaderSessionID) - )); - - resourceManager = system.actorOf( - Props.create( - TestingYarnFlinkResourceManager.class, - flinkConfig, - yarnConfig, - leaderRetrievalService, - applicationMasterHostName, - webInterfaceURL, - taskManagerParameters, - taskManagerLaunchContext, - yarnHeartbeatIntervalMillis, - maxFailedContainers, - numInitialTaskManagers, - callbackHandler, - resourceManagerClient, - nodeManagerClient - )); - - leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); - - final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); - final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Container container = (Container) invocation.getArguments()[0]; - resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), - leader1Gateway); - return null; - } - }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class)); - - expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); - - resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); - - for (int i = 0; i < containerList.size(); i++) { - expectMsgClass(deadline.timeLeft(), Acknowledge.class); - } - - Future taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft()); - - Await.ready(taskManagerRegisteredFuture, deadline.timeLeft()); - - leaderRetrievalService.notifyListener(null, null); - - leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); - - expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); - - resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); - - for (Container container: containerList) { - resourceManagerGateway.tell( - new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), - leader1Gateway); - } - - for (int i = 0; i < containerList.size(); i++) { - expectMsgClass(deadline.timeLeft(), Acknowledge.class); - } - - Future numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft()); - - int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); - - assertEquals(numInitialTaskManagers, numberOfRegisteredResources); - } finally { - if (resourceManager != null) { - resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); - } - } - }}; - } - - static class TestingContainer extends Container { - - private final String id; - private final String host; - - TestingContainer(String id, String host) { - this.id = id; - this.host = host; - } - - @Override - public ContainerId getId() { - ContainerId containerId = mock(ContainerId.class); - when(containerId.toString()).thenReturn(id); - - return containerId; - } - - @Override - public void setId(ContainerId containerId) { - - } - - @Override - public NodeId getNodeId() { - NodeId nodeId = mock(NodeId.class); - when(nodeId.getHost()).thenReturn(host); - - return nodeId; - } - - @Override - public void setNodeId(NodeId nodeId) { - - } - - @Override - public String getNodeHttpAddress() { - return null; - } - - @Override - public void setNodeHttpAddress(String s) { - - } - - @Override - public Resource getResource() { - return null; - } - - @Override - public void setResource(Resource resource) { - - } - - @Override - public Priority getPriority() { - return null; - } - - @Override - public void setPriority(Priority priority) { - - } - - @Override - public Token getContainerToken() { - return null; - } - - @Override - public void setContainerToken(Token token) { - - } - - @Override - public int compareTo(Container o) { - return 0; - } - } -}