flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath
Date Thu, 18 May 2017 12:02:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master e28380152 -> 4a314a80e


[FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath

This closes #3931


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a314a80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a314a80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a314a80

Branch: refs/heads/master
Commit: 4a314a80e64ea2af624131e5c02e1f167a22a354
Parents: e283801
Author: zentol <chesnay@apache.org>
Authored: Wed May 17 18:15:27 2017 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu May 18 14:01:53 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 docs/setup/yarn_setup.md                        |  12 ++
 .../yarn/AbstractYarnClusterDescriptor.java     | 152 ++++++++++++-------
 .../yarn/configuration/YarnConfigOptions.java   |  18 +++
 4 files changed, 129 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c4a7354..8a6f67d 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in
the String
 
 - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application.
 
+- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and how the user-jar
is included in the system class path for per-job clusters. Setting this parameter to `DISABLED`
causes the jar to be included in the user class path instead. Setting this parameter to one
of `FIRST`, `LAST` or `ORDER` causes the jar to be included in the system class path, with
the jar either being placed at the beginning of the class path (`FIRST`), at the end (`LAST`),
or based on the lexicographic order (`ORDER`).
+
 ### Mesos
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index 1ce45ad..190a796 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting
the env
 
 Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`)
to "fire and forget" a Flink job to the YARN cluster. In this case, your application will
not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!
 
+### User jars & Classpath
+
+By default Flink will include the user jars into the system classpath when running a single
job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter.
+
+When setting this to `DISABLED` Flink will include the jar in the user classpath instead.
+
+The user-jars position in the class path can be controlled by setting the parameter to one
of the following:
+
+- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order.
+- `FIRST`: Adds the jar to the beginning of the system class path.
+- `LAST`: Adds the jar to the end of the system class path.
+
 ## Recovery behavior of Flink on YARN
 
 Flink's YARN client has the following configuration parameters to control how to behave in
case of container failures. These parameters can be set either from the `conf/flink-conf.yaml`
or when starting the YARN session, using `-D` parameters.

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 65525f2..3110a5b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -77,6 +78,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -144,7 +146,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	/** Optional Jar file to include in the system class loader of all application nodes
 	 * (for per-job submission) */
-	private Set<File> userJarFiles;
+	private final Set<File> userJarFiles = new HashSet<>();
+
+	private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
 	public AbstractYarnClusterDescriptor() {
 		// for unit tests only
@@ -172,6 +176,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
 			taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
 
+			userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 		} catch (Exception e) {
 			LOG.debug("Config couldn't be loaded from environment variable.", e);
 		}
@@ -200,6 +205,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
 		this.flinkConfiguration = conf;
+
+		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 	}
 
 	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
@@ -265,7 +272,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Returns true if the descriptor has the job jars to include in the classpath.
 	 */
 	public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
-		if (userJarFiles == null || userJarFiles.size() != requiredJarFiles.size()) {
+		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED) {
+			return false;
+		}
+		if (userJarFiles.size() != requiredJarFiles.size()) {
 			return false;
 		}
 		try {
@@ -284,16 +294,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Sets the user jar which is included in the system classloader of all nodes.
 	 */
 	public void setProvidedUserJarFiles(List<URL> userJarFiles) {
-		Set<File> localUserJarFiles = new HashSet<>(userJarFiles.size());
 		for (URL jarFile : userJarFiles) {
 			try {
-				localUserJarFiles.add(new File(jarFile.toURI()));
+				this.userJarFiles.add(new File(jarFile.toURI()));
 			} catch (URISyntaxException e) {
 				throw new IllegalArgumentException("Couldn't add local user jar: " + jarFile
 					+ " Currently only file:/// URLs are supported.");
 			}
 		}
-		this.userJarFiles = localUserJarFiles;
 	}
 
 	public String getDynamicPropertiesEncoded() {
@@ -603,22 +611,22 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
-		Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
+		Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
 		for (File file : shipFiles) {
-			effectiveShipFiles.add(file.getAbsoluteFile());
+			systemShipFiles.add(file.getAbsoluteFile());
 		}
 
 		//check if there is a logback or log4j file
 		File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
 		final boolean hasLogback = logbackFile.exists();
 		if (hasLogback) {
-			effectiveShipFiles.add(logbackFile);
+			systemShipFiles.add(logbackFile);
 		}
 
 		File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
 		final boolean hasLog4j = log4jFile.exists();
 		if (hasLog4j) {
-			effectiveShipFiles.add(log4jFile);
+			systemShipFiles.add(log4jFile);
 			if (hasLogback) {
 				// this means there is already a logback configuration file --> fail
 				LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both
LOG4J and " +
@@ -626,13 +634,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		addLibFolderToShipFiles(effectiveShipFiles);
-
-		// add the user jar to the classpath of the to-be-created cluster
-		if (userJarFiles != null) {
-			effectiveShipFiles.addAll(userJarFiles);
-		}
-
+		addLibFolderToShipFiles(systemShipFiles);
 
 		// Set-up ApplicationSubmissionContext for the application
 
@@ -666,57 +668,39 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// local resource map for Yarn
-		final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size());
+		final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size()
+ userJarFiles.size());
 		// list of remote paths (after upload)
-		final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size());
+		final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
 		// ship list that enables reuse of resources for task manager containers
 		StringBuilder envShipFileList = new StringBuilder();
 
-		// upload and register ship files
-		final List<String> classPaths = new ArrayList<>();
-		for (File shipFile : effectiveShipFiles) {
-			LocalResource shipResources = Records.newRecord(LocalResource.class);
-
-			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			Path remotePath =
-				Utils.setupLocalResource(fs, appId.toString(), shipLocalPath, shipResources, fs.getHomeDirectory());
-
-			paths.add(remotePath);
-
-			localResources.put(shipFile.getName(), shipResources);
-
-			if (shipFile.isDirectory()) {
-				// add directories to the classpath
-				java.nio.file.Path shipPath = shipFile.toPath();
-				final java.nio.file.Path parentPath = shipPath.getParent();
+		// upload and register ship files	
+		List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(),
paths, localResources, envShipFileList);
+		List<String> userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(),
paths, localResources, envShipFileList);
 
-				Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
-					@Override
-					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
-							throws IOException {
-						java.nio.file.Path relativePath = parentPath.relativize(file);
-
-						classPaths.add(relativePath.toString());
-
-						return FileVisitResult.CONTINUE;
-					}
-				});
-			} else {
-				// add files to the classpath
-				classPaths.add(shipFile.getName());
-			}
-
-			envShipFileList.append(remotePath).append(",");
+		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
+			systemClassPaths.addAll(userClassPaths);
 		}
 
 		// normalize classpath by sorting
-		Collections.sort(classPaths);
+		Collections.sort(systemClassPaths);
+		Collections.sort(userClassPaths);
 
 		// classpath assembler
 		StringBuilder classPathBuilder = new StringBuilder();
-		for (String classPath : classPaths) {
+		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
+			for (String userClassPath : userClassPaths) {
+				classPathBuilder.append(userClassPath).append(File.pathSeparator);
+			}
+		}
+		for (String classPath : systemClassPaths) {
 			classPathBuilder.append(classPath).append(File.pathSeparator);
 		}
+		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
+			for (String userClassPath : userClassPaths) {
+				classPathBuilder.append(userClassPath).append(File.pathSeparator);
+			}
+		}
 
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
@@ -936,6 +920,51 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 		return report;
 	}
+	
+	private static List<String> uploadAndRegisterFiles(
+			Collection<File> shipFiles,
+			FileSystem fs,
+			String appId,
+			List<Path> remotePaths,
+			Map<String, LocalResource> localResources,
+			StringBuilder envShipFileList) throws IOException {
+		final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
+		for (File shipFile : shipFiles) {
+			LocalResource shipResources = Records.newRecord(LocalResource.class);
+
+			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+			Path remotePath =
+				Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory());
+
+			remotePaths.add(remotePath);
+
+			localResources.put(shipFile.getName(), shipResources);
+
+			if (shipFile.isDirectory()) {
+				// add directories to the classpath
+				java.nio.file.Path shipPath = shipFile.toPath();
+				final java.nio.file.Path parentPath = shipPath.getParent();
+
+				Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
+					@Override
+					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
+						throws IOException {
+						java.nio.file.Path relativePath = parentPath.relativize(file);
+
+						classPaths.add(relativePath.toString());
+
+						return FileVisitResult.CONTINUE;
+					}
+				});
+			} else {
+				// add files to the classpath
+				classPaths.add(shipFile.getName());
+			}
+
+			envShipFileList.append(remotePath).append(",");
+		}
+		return classPaths;
+	}
 
 	/**
 	 * Kills YARN application and stops YARN client.
@@ -1220,7 +1249,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) {
+	protected void addLibFolderToShipFiles(Collection<File> effectiveShipFiles) {
 		// Add lib folder to the ship files if the environment variable is set.
 		// This is for convenience when running from the command-line.
 		// (for other files users explicitly set the ship files)
@@ -1298,6 +1327,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return amContainer;
 	}
 
+	private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration
config) {
+		String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
+		try {
+			return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase());
+		} catch (IllegalArgumentException e) {
+			LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling
back to default ({}).",
+				YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
+				configuredUserJarInclusion,
+				YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
+			return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
+		}
+	}
+
 	/**
 	 * Creates a YarnClusterClient; may be overriden in tests
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 071bb7d..8839c1e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -42,8 +42,26 @@ public class YarnConfigOptions {
 			key("yarn.appmaster.rpc.port")
 			.defaultValue(-1);
 
+	/**
+	 * Defines whether user-jars are included in the system class path for per-job-clusters
as well as their positioning
+	 * in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"),
or be positioned based on
+	 * their name ("ORDER").
+	 */
+	public static final ConfigOption<String> CLASSPATH_INCLUDE_USER_JAR =
+		key("yarn.per-job-cluster.include-user-jar")
+			.defaultValue("ORDER");
+	
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated */
 	private YarnConfigOptions() {}
+
+	/** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */
+	public enum UserJarInclusion {
+		DISABLED,
+		FIRST,
+		LAST,
+		ORDER
+	}
 }


Mime
View raw message