flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2954] Add config parameter for passing environment variables to YARN
Date Sat, 05 Dec 2015 14:26:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9849a5769 -> b9639eb54


[FLINK-2954] Add config parameter for passing environment variables to YARN

This closes #1409


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9639eb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9639eb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9639eb5

Branch: refs/heads/master
Commit: b9639eb54413f94aec0cf54e5811a2dfd41378f8
Parents: 9849a57
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Nov 26 11:47:25 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Sat Dec 5 12:17:13 2015 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            | 10 ++++++
 .../flink/configuration/ConfigConstants.java    | 16 +++++++++-
 flink-dist/pom.xml                              |  6 ----
 flink-yarn-tests/pom.xml                        |  6 ++--
 .../java/org/apache/flink/yarn/UtilsTest.java   | 25 ++++++++++++++-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java  |  5 ++-
 .../main/java/org/apache/flink/yarn/Utils.java  | 33 ++++++++++++--------
 .../flink/yarn/ApplicationMasterBase.scala      |  4 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |  6 +++-
 10 files changed, 83 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 8abcc03..93e6870 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -415,6 +415,16 @@ so that the Flink client is able to pick those details up. This configuration
pa
 changing the default location of that file (for example for environments sharing a Flink

 installation between users)
 
+- `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
+will be passed as environment variables to the ApplicationMaster/JobManager process.
+For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:
+
+      yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+
+
+- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting
custom
+environment variables for the TaskManager processes.
+
 ## High Availability Mode
 
 - `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster
execution. Currently,

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 251ea9c..da1dfdd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -243,6 +243,20 @@ public final class ConfigConstants {
 	 */
 	public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
 
+	/**
+	 * Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager).
+	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
+	 * 	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+	 * in the flink-conf.yaml.
+	 */
+	public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";
+
+	/**
+	 * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
+	 * setting custom environment variables.
+	 */
+	public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";
+
 
 	// ------------------------ Hadoop Configuration ------------------------
 
@@ -678,7 +692,7 @@ public final class ConfigConstants {
 	 * The default path to the file containing the list of access privileged users and passwords.
 	 */
 	public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;
-	
+
 	// ------------------------------ Akka Values ------------------------------
 
 	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 3d556f6..e6b2fe0 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -109,12 +109,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-language-binding-generic</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-python</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index f444311..5954327 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -44,7 +44,7 @@ under the License.
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients</artifactId>
@@ -156,6 +156,7 @@ under the License.
 							</filters>
 							<transformers>
 								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
 							</transformers>
 							<relocations>
 								<relocation>
@@ -174,9 +175,6 @@ under the License.
 									<shadedPattern>org.apache.flink.hadoop.shaded.org.jboss.netty</shadedPattern>
 								</relocation>
 							</relocations>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-							</transformers>
 						</configuration>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 5c709b0..968e769 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 public class UtilsTest {
 	private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
@@ -63,7 +64,7 @@ public class UtilsTest {
 		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
 
 		// test different configuration
-		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
+		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
 		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
 		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
@@ -97,6 +98,28 @@ public class UtilsTest {
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
+	@Test
+	public void testGetEnvironmentVariables() {
+		Configuration testConf = new Configuration();
+		testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
+
+		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.",
testConf);
+
+		Assert.assertEquals(1, res.size());
+		Map.Entry<String, String> entry = res.entrySet().iterator().next();
+		Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+		Assert.assertEquals("/usr/lib/native", entry.getValue());
+	}
+
+	@Test
+	public void testGetEnvironmentVariablesErroneous() {
+		Configuration testConf = new Configuration();
+		testConf.setString("yarn.application-master.env.", "/usr/lib/native");
+
+		Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.",
testConf);
+
+		Assert.assertEquals(0, res.size());
+	}
 
 	//
 	// --------------- Tools to test if a certain string has been logged with Log4j. -------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index db02848..69925f2 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -479,7 +479,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		}
 
 		// get temporary file for reading input data for wordcount example
-		File tmpInFile = null;
+		File tmpInFile;
 		try{
 			tmpInFile = tmp.newFile();
 			FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index 74ef5c3..cda1cab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -587,8 +587,11 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 
 		// Setup CLASSPATH for ApplicationMaster
 		Map<String, String> appMasterEnv = new HashMap<String, String>();
+		// set user specified app master environment variables
+		appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
flinkConfiguration));
+		// set classpath from YARN configuration
 		Utils.setupEnv(conf, appMasterEnv);
-		// set configuration values
+		// set Flink on YARN internal configuration values
 		appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
 		appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
 		appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index ea49066..a5dc6ec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,11 +18,11 @@
 package org.apache.flink.yarn;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -98,7 +98,7 @@ public final class Utils {
 		
 		Path dst = new Path(homedir, suffix);
 		
-		LOG.info("Copying from " + localRsrcPath + " to " + dst );
+		LOG.info("Copying from " + localRsrcPath + " to " + dst);
 		fs.copyFromLocalFile(localRsrcPath, dst);
 		registerLocalResource(fs, dst, appMasterJar);
 		return dst;
@@ -187,17 +187,6 @@ public final class Utils {
 		}
 	}
 
-	public static void logFilesInCurrentDirectory(final Logger logger) {
-		new File(".").list(new FilenameFilter() {
-			
-			@Override
-			public boolean accept(File dir, String name) {
-				logger.info(dir.getAbsolutePath() + "/" + name);
-				return true;
-			}
-		});
-	}
-	
 	/**
 	 * Copied method from org.apache.hadoop.yarn.util.Apps
 	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
@@ -221,4 +210,22 @@ public final class Utils {
 	private Utils() {
 		throw new RuntimeException();
 	}
+
+	/**
+	 * Method to extract environment variables from the flinkConfiguration based on the given
prefix String.
+	 *
+	 * @param envPrefix Prefix for the environment variables key
+	 * @param flinkConfiguration The Flink config to get the environment variable defintion
from
+	 */
+	public static Map<String, String> getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration
flinkConfiguration) {
+		Map<String, String> result  = new HashMap<>();
+		for(Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
+			if(entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length())
{
+				// remove prefix
+				String key = entry.getKey().substring(envPrefix.length());
+				result.put(key, entry.getValue());
+			}
+		}
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index 73fc951..6bb3852 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -117,7 +117,7 @@ abstract class ApplicationMasterBase {
 
       // if a web monitor shall be started, set the port to random binding
       if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
       }
 
       val (actorSystem, jmActor, archiveActor, webMonitor) =
@@ -147,7 +147,7 @@ abstract class ApplicationMasterBase {
         jobManagerPort, webServerPort, slots, taskManagerCount,
         dynamicPropertiesEncodedString)
 
-      val hadoopConfig = new YarnConfiguration();
+      val hadoopConfig = new YarnConfiguration()
 
       // send "start yarn session" message to YarnJobManager.
       log.info("Starting YARN session on Job Manager.")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9639eb5/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index c8a9480..4ba80ea 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -707,8 +707,12 @@ class YarnJobManager(
 
     ctx.setLocalResources(taskManagerLocalResources.asJava)
 
-    // Setup classpath for container ( = TaskManager )
+    // Setup classpath and environment variables for container ( = TaskManager )
     val containerEnv = new java.util.HashMap[String, String]()
+    // user defined TaskManager environment variables
+    containerEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+      flinkConfiguration))
+    // YARN classpath
     Utils.setupEnv(yarnConf, containerEnv)
     containerEnv.put(FlinkYarnClientBase.ENV_CLIENT_USERNAME, yarnClientUsername)
     ctx.setEnvironment(containerEnv)


Mime
View raw message