flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-7107] [flip6] Add option to start a Flip-6 Yarn session cluster
Date Wed, 09 Aug 2017 09:58:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master fe5de7e8e -> ff70cc3af


[FLINK-7107] [flip6] Add option to start a Flip-6 Yarn session cluster

The Flip-6 Yarn session cluster can now be started with yarn-session.sh --flip6. Per
default, the old Yarn application master will be started.

This closes #4465.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a570aa5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a570aa5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a570aa5c

Branch: refs/heads/master
Commit: a570aa5c0c7d9a18e6cdcb689dcad9ff173dc2ac
Parents: fe5de7e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 2 16:40:13 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Aug 9 11:56:02 2017 +0200

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |  3 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  5 +++-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 30 ++++++++++++++++----
 3 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 0d57e20..1fed554 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -364,7 +364,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger
{
 			// override cluster descriptor to replace the YarnClient
 			protected AbstractYarnClusterDescriptor getClusterDescriptor(
 					Configuration configuration,
-					String configurationDirecotry) {
+					String configurationDirecotry,
+					boolean flip6) {
 				return new TestingYarnClusterDescriptor(configuration, configurationDirecotry);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index c7c25ff..8eef8f0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -184,7 +184,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		}
 
 		@Override
-		protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration,
String configurationDirectory) {
+		protected AbstractYarnClusterDescriptor getClusterDescriptor(
+			Configuration configuration,
+			String configurationDirectory,
+			boolean flip6) {
 			return new JarAgnosticClusterDescriptor(configuration, configurationDirectory);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4a214e2..5d8abac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.YarnClusterDescriptorV2;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -111,6 +112,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	private final Option slots;
 	private final Option detached;
 	private final Option zookeeperNamespace;
+	private final Option flip6;
 
 	/**
 	 * @deprecated Streaming mode has been deprecated without replacement. Set the
@@ -156,6 +158,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink
in streaming mode");
 		name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for
the application on YARN");
 		zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
+		flip6 = new Option(shortPrefix + "f6", longPrefix + "flip6", false, "Specify this option
to start a Flip-6 Yarn session cluster.");
 
 		allOptions = new Options();
 		allOptions.addOption(flinkJar);
@@ -172,6 +175,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		allOptions.addOption(name);
 		allOptions.addOption(applicationId);
 		allOptions.addOption(zookeeperNamespace);
+		allOptions.addOption(flip6);
 	}
 
 	/**
@@ -266,7 +270,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
 			configuration,
-			configurationDirectory);
+			configurationDirectory,
+			cmd.hasOption(flip6.getOpt()));
 
 		// Jar Path
 		Path localJarPath;
@@ -552,7 +557,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					: config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
 			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
 
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(config, configurationDirectory);
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
+				config,
+				configurationDirectory,
+				cmdLine.hasOption(flip6.getOpt()));
 			return yarnDescriptor.retrieve(applicationID);
 		} else {
 			throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
@@ -609,7 +617,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		// Query cluster for metrics
 		if (cmd.hasOption(query.getOpt())) {
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
+				configuration,
+				configurationDirectory,
+				cmd.hasOption(flip6.getOpt()));
 			String description;
 			try {
 				description = yarnDescriptor.getClusterDescription();
@@ -622,7 +633,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			return 0;
 		} else if (cmd.hasOption(applicationId.getOpt())) {
 
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
+				configuration,
+				configurationDirectory,
+				cmd.hasOption(flip6.getOpt()));
 
 			//configure ZK namespace depending on the value passed
 			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
@@ -764,7 +778,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}
 
-	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration,
String configurationDirectory) {
-		return new YarnClusterDescriptor(configuration, configurationDirectory);
+	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration,
String configurationDirectory, boolean flip6) {
+		if (flip6) {
+			return new YarnClusterDescriptorV2(configuration, configurationDirectory);
+		} else {
+			return new YarnClusterDescriptor(configuration, configurationDirectory);
+		}
 	}
 }


Mime
View raw message