flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/2] flink git commit: [FLINK-2797][cli] Add support for running jobs in detached mode from CLI
Date Fri, 13 Nov 2015 15:52:23 GMT
[FLINK-2797][cli] Add support for running jobs in detached mode from CLI

This closes #1214.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7cf642b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7cf642b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7cf642b

Branch: refs/heads/master
Commit: b7cf642bca109dced7dc1a02e831405887d7a9a6
Parents: 30647a2
Author: Sachin Goel <sachingoel0101@gmail.com>
Authored: Fri Oct 2 18:11:09 2015 +0530
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Nov 13 16:28:38 2015 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                | 10 +++++--
 .../org/apache/flink/client/CliFrontend.java    | 29 +++++++++++++-------
 .../flink/client/cli/CliFrontendParser.java     |  6 ++++
 .../apache/flink/client/cli/ProgramOptions.java | 16 +++++++----
 .../apache/flink/client/CliFrontendRunTest.java | 21 ++++++++++----
 5 files changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 6bd2352..dfa1baf 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -62,6 +62,10 @@ The command line can be used to
 
             ./bin/flink run -q ./examples/WordCount.jar
 
+-   Run example program in detached mode
+
+            ./bin/flink run -d ./examples/WordCount.jar
+
 -   Run example program on a specific JobManager:
 
         ./bin/flink run -m myJMHost:6123 \
@@ -128,14 +132,16 @@ Action "run" compiles and runs a program.
                                       program. Optional flag to override the
                                       default value specified in the
                                       configuration.
-     -q --sysoutLogging               Specfying this flag will disable log messages
+     -q --sysoutLogging               Specifying this flag will disable log messages
                                       being reported on the console. All messages
                                       however will still be logged by SLF4J loggers,
                                       regardless of this setting.
+     -d --detached                    Specifying this option will run the job in
+                                      detached mode.
 
   Additional arguments if -m yarn-cluster is set:
      -yD <arg>                            Dynamic properties
-     -yd,--yarndetached                   Start detached
+     -yd,--yarndetached                   Start detached [consider using -d flag above]
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 933a22c..e93025f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -296,7 +296,7 @@ public class CliFrontend {
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
 
-			Client client = getClient(options, program.getMainClassName(), userParallelism);
+			Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 
@@ -307,16 +307,11 @@ public class CliFrontend {
 					userParallelism = client.getMaxSlots();
 				}
 
-				// check if detached per job yarn cluster is used to start flink
-				if (yarnCluster != null && yarnCluster.isDetached()) {
-					logAndSysout("The Flink YARN client has been started in detached mode. In order to stop
" +
-							"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-							"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
-							"Please also note that the temporary files of the YARN session in the home directoy
will not be removed.");
+				// detached mode
+				if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached()))
{
 					exitCode = executeProgramDetached(program, client, userParallelism);
 				}
 				else {
-					// regular (blocking) execution.
 					exitCode = executeProgramBlocking(program, client, userParallelism);
 				}
 
@@ -638,6 +633,14 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism)
{
+		// log message for detached yarn job
+		if (yarnCluster != null) {
+			logAndSysout("The Flink YARN client has been started in detached mode. In order to stop
" +
+					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+					"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+					"Please also note that the temporary files of the YARN session in the home directoy
will not be removed.");
+		}
+
 		LOG.info("Starting execution of program");
 
 		JobSubmissionResult result;
@@ -649,7 +652,7 @@ public class CliFrontend {
 			program.deleteExtractedLibraries();
 		}
 
-		if (yarnCluster != null && yarnCluster.isDetached()) {
+		if (yarnCluster != null) {
 			yarnCluster.stopAfterJob(result.getJobID());
 			yarnCluster.disconnect();
 		}
@@ -796,7 +799,8 @@ public class CliFrontend {
 	protected Client getClient(
 			CommandLineOptions options,
 			String programName,
-			int userParallelism)
+			int userParallelism,
+			boolean detachedMode)
 		throws Exception {
 		InetSocketAddress jobManagerAddress;
 		int maxSlots = -1;
@@ -811,6 +815,11 @@ public class CliFrontend {
 				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
 			}
 			flinkYarnClient.setName("Flink Application: " + programName);
+			// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
+			// from yarn options.
+			if (detachedMode) {
+				flinkYarnClient.setDetachedMode(true);
+			}
 
 			// the number of slots available from YARN:
 			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 028aead..1226d48 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -60,6 +60,9 @@ public class CliFrontendParser {
 	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present,
" +
 			"supress logging output to standard out.");
 
+	static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs
" +
+			"the job in detached mode");
+
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
@@ -94,6 +97,7 @@ public class CliFrontendParser {
 		PARALLELISM_OPTION.setArgName("parallelism");
 
 		LOGGING_OPTION.setRequired(false);
+		DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
 		ARGS_OPTION.setArgName("programArgs");
@@ -123,6 +127,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
+		options.addOption(DETACHED_OPTION);
 
 		// also add the YARN options so that the parser can parse them
 		yarnSessionCLi.getYARNSessionCLIOptions(options);
@@ -134,6 +139,7 @@ public class CliFrontendParser {
 		options.addOption(CLASSPATH_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
+		options.addOption(DETACHED_OPTION);
 		return options;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 11382d2..499d3ca 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
@@ -49,6 +50,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean stdoutLogging;
 
+	private final boolean detachedMode;
+
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
@@ -100,11 +103,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 			parallelism = -1;
 		}
 
-		if(line.hasOption(LOGGING_OPTION.getOpt())){
-			stdoutLogging = false;
-		} else{
-			stdoutLogging = true;
-		}
+		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
+		detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
 	}
 
 	public String getJarFilePath() {
@@ -130,4 +130,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	public boolean getStdoutLogging() {
 		return stdoutLogging;
 	}
-}
\ No newline at end of file
+
+	public boolean getDetachedMode() {
+		return detachedMode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f910312..64c2709 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -52,21 +52,28 @@ public class CliFrontendRunTest {
 			// test without parallelism
 			{
 				String[] parameters = {"-v", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
 			// test configure parallelism
 			{
 				String[] parameters = {"-v", "-p", "42",  getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
 			// test configure sysout logging
 			{
 				String[] parameters = {"-p", "2", "-q", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false);
+				assertEquals(0, testFrontend.run(parameters));
+			}
+
+			// test detached mode
+			{
+				String[] parameters = {"-p", "2", "-d", getTestJarPath()};
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
@@ -96,15 +103,18 @@ public class CliFrontendRunTest {
 		
 		private final int expectedParallelism;
 		private final boolean sysoutLogging;
+		private final boolean isDetached;
 		
-		public RunTestingCliFrontend(int expectedParallelism, boolean logging) throws Exception
{
+		public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached)
throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
 			this.expectedParallelism = expectedParallelism;
 			this.sysoutLogging = logging;
+			this.isDetached = isDetached;
 		}
 
 		@Override
 		protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism)
{
+			assertTrue(isDetached);
 			assertEquals(this.expectedParallelism, parallelism);
 			assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
 			return 0;
@@ -112,11 +122,12 @@ public class CliFrontendRunTest {
 
 		@Override
 		protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism)
{
+			assertTrue(!isDetached);
 			return 0;
 		}
 
 		@Override
-		protected Client getClient(CommandLineOptions options, String programName, int userParallelism)
throws Exception {
+		protected Client getClient(CommandLineOptions options, String programName, int userParallelism,
boolean detached) throws Exception {
 			return Mockito.mock(Client.class);
 		}
 	}


Mime
View raw message