flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2298] Add option to pass a custom name for Flink on YARN
Date Wed, 01 Jul 2015 13:25:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master bd3c8d525 -> a9dc4307d


[FLINK-2298] Add option to pass a custom name for Flink on YARN

This closes #876


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9dc4307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9dc4307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9dc4307

Branch: refs/heads/master
Commit: a9dc4307d9d04dc2864e0b80609a62b4d01f5097
Parents: bd3c8d5
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Jun 30 16:56:30 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Jul 1 15:25:18 2015 +0200

----------------------------------------------------------------------
 docs/setup/yarn_setup.md                        |  2 ++
 .../org/apache/flink/client/CliFrontend.java    |  6 ++--
 .../flink/client/FlinkYarnSessionCli.java       |  9 +++++-
 .../runtime/yarn/AbstractFlinkYarnClient.java   |  9 ++++--
 .../flink/yarn/YARNSessionFIFOITCase.java       | 12 ++++++--
 .../org/apache/flink/yarn/FlinkYarnClient.java  | 32 ++++++++++++++------
 6 files changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index a65c217..9787179 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -100,11 +100,13 @@ Usage:
      -D <arg>                        Dynamic properties
      -d,--detached                   Start detached
      -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
+     -nm,--name                      Set a custom name for the application on YARN
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
      -s,--slots <arg>                Number of slots per TaskManager
      -st,--streaming                 Start Flink in streaming mode
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
+
 ~~~
 
 Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment
variable to be set to read the YARN and HDFS configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 241df2d..83bb99a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -134,8 +134,6 @@ public class CliFrontend {
 
 	private AbstractFlinkYarnCluster yarnCluster;
 
-
-
 	/**
 	 *
 	 * @throws Exception Thrown if teh configuration directory was not found, the configuration
could not
@@ -744,7 +742,7 @@ public class CliFrontend {
 			// user wants to run Flink in YARN cluster.
 			CommandLine commandLine = options.getCommandLine();
 			AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
-
+			flinkYarnClient.setName("Flink Application: " + programName);
 			if (flinkYarnClient == null) {
 				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
 			}
@@ -763,7 +761,7 @@ public class CliFrontend {
 			}
 
 			try {
-				yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
+				yarnCluster = flinkYarnClient.deploy();
 				yarnCluster.connectToCluster();
 			}
 			catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index c11edc7..83993f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -74,6 +74,7 @@ public class FlinkYarnSessionCli {
 	private final Option SLOTS;
 	private final Option DETACHED;
 	private final Option STREAMING;
+	private final Option NAME;
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D,
such as
@@ -97,6 +98,7 @@ public class FlinkYarnSessionCli {
 		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
 		DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
 		STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink
in streaming mode");
+		NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for
the application on YARN");
 	}
 
 	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
@@ -220,6 +222,9 @@ public class FlinkYarnSessionCli {
 		if (cmd.hasOption(STREAMING.getOpt())) {
 			flinkYarnClient.setStreamingMode(true);
 		}
+		if(cmd.hasOption(NAME.getOpt())) {
+			flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
+		}
 		return flinkYarnClient;
 	}
 
@@ -244,6 +249,7 @@ public class FlinkYarnSessionCli {
 		opt.addOption(DYNAMIC_PROPERTIES);
 		opt.addOption(DETACHED);
 		opt.addOption(STREAMING);
+		opt.addOption(NAME);
 		formatter.printHelp(" ", opt);
 	}
 
@@ -350,6 +356,7 @@ public class FlinkYarnSessionCli {
 		options.addOption(DYNAMIC_PROPERTIES);
 		options.addOption(DETACHED);
 		options.addOption(STREAMING);
+		options.addOption(NAME);
 	}
 
 	public int run(String[] args) {
@@ -393,7 +400,7 @@ public class FlinkYarnSessionCli {
 
 
 			try {
-				yarnCluster = flinkYarnClient.deploy(null);
+				yarnCluster = flinkYarnClient.deploy();
 				// only connect to cluster if its not a detached session.
 				if(!flinkYarnClient.isDetached()) {
 					yarnCluster.connectToCluster();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 0e307e0..65ae2be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -115,9 +115,8 @@ public abstract class AbstractFlinkYarnClient {
 	/**
 	 * Trigger the deployment to YARN.
 	 *
-	 * @param clusterName Name to be shown in the YARN resource manager overview.
 	 */
-	public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception;
+	public abstract AbstractFlinkYarnCluster deploy() throws Exception;
 
 	/**
 	 * @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns
@@ -138,4 +137,10 @@ public abstract class AbstractFlinkYarnClient {
 	 * @param streamingMode
 	 */
 	public abstract  void setStreamingMode(boolean streamingMode);
+
+	/**
+	 * Set a name for the YARN application
+	 * @param name
+	 */
+	public abstract void setName(String name);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index dd32b0d..ac1f971 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -121,6 +121,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-n", "1",
 						"-jm", "768",
 						"-tm", "1024",
+						"--name", "MyCustomName", // test setting a custom name
 						"--detached"},
 				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
 
@@ -142,7 +143,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			yc.start();
 			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
 			Assert.assertEquals(1, apps.size()); // Only one running
-			ApplicationId id = apps.get(0).getApplicationId();
+			ApplicationReport app = apps.get(0);
+			Assert.assertEquals("MyCustomName", app.getName());
+			ApplicationId id = app.getApplicationId();
 			yc.killApplication(id);
 
 			while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
@@ -166,6 +169,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 				"-n", "1",
 				"-jm", "768",
 				"-tm", "1024",
+				"-nm", "customName",
 				"-Dfancy-configuration-value=veryFancy",
 				"-Dyarn.maximum-failed-containers=3"},
 				"Number of connected TaskManagers changed to 1. Slots available: 1",
@@ -180,7 +184,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			yc.start();
 			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
 			Assert.assertEquals(1, apps.size()); // Only one running
-			String url = apps.get(0).getTrackingUrl();
+			ApplicationReport app = apps.get(0);
+			Assert.assertEquals("customName", app.getName());
+			String url = app.getTrackingUrl();
 			if(!url.endsWith("/")) {
 				url += "/";
 			}
@@ -615,7 +621,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		// deploy
 		AbstractFlinkYarnCluster yarnCluster = null;
 		try {
-			yarnCluster = flinkYarnClient.deploy(null);
+			yarnCluster = flinkYarnClient.deploy();
 			yarnCluster.connectToCluster();
 		} catch (Exception e) {
 			System.err.println("Error while deploying YARN cluster: "+e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9dc4307/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index f82f013..deb4809 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -143,6 +143,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	private boolean detached;
 	private boolean streamingMode;
 
+	private String customName = null;
 
 	public FlinkYarnClient() {
 		conf = new YarnConfiguration();
@@ -314,7 +315,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		return detached;
 	}
 
-	public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exception {
+	public AbstractFlinkYarnCluster deploy() throws Exception {
 
 		UserGroupInformation.setConfiguration(conf);
 		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -327,11 +328,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 			return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
 				@Override
 				public AbstractFlinkYarnCluster run() throws Exception {
-					return deployInternal(clusterName);
+					return deployInternal();
 				}
 			});
 		} else {
-			return deployInternal(clusterName);
+			return deployInternal();
 		}
 	}
 
@@ -341,7 +342,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	 * This method will block until the ApplicationMaster/JobManager have been
 	 * deployed on YARN.
 	 */
-	protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exception {
+	protected AbstractFlinkYarnCluster deployInternal() throws Exception {
 		isReadyForDepoyment();
 
 		LOG.info("Using values:");
@@ -591,14 +592,17 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		capability.setMemory(jobManagerMemoryMb);
 		capability.setVirtualCores(1);
 
-		if(clusterName == null) {
-			clusterName = "Flink session with "+taskManagerCount+" TaskManagers";
-		}
-		if(detached) {
-			clusterName += " (detached)";
+		String name;
+		if(customName == null) {
+			name = "Flink session with "+taskManagerCount+" TaskManagers";
+			if(detached) {
+				name += " (detached)";
+			}
+		} else {
+			name = customName;
 		}
 
-		appContext.setApplicationName(clusterName); // application name
+		appContext.setApplicationName(name); // application name
 		appContext.setApplicationType("Apache Flink");
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
@@ -734,6 +738,14 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 		this.streamingMode = streamingMode;
 	}
 
+	@Override
+	public void setName(String name) {
+		if(name == null) {
+			throw new IllegalArgumentException("The passed name is null");
+		}
+		customName = name;
+	}
+
 	public static class YarnDeploymentException extends RuntimeException {
 		public YarnDeploymentException() {
 		}


Mime
View raw message