flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-2248] add flag to disable sysout logging from cli
Date Fri, 31 Jul 2015 13:49:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master a56aad74e -> ce622aa99


[FLINK-2248] add flag to disable sysout logging from cli

This closes #957.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce622aa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce622aa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce622aa9

Branch: refs/heads/master
Commit: ce622aa99f6ec6b4325f11dbd6211a40d97c48ed
Parents: a56aad7
Author: Sachin Goel <sachingoel0101@gmail.com>
Authored: Thu Jul 30 14:43:03 2015 +0530
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Jul 31 15:49:02 2015 +0200

----------------------------------------------------------------------
 docs/apis/cli.md                                |   4 +
 .../org/apache/flink/client/CliFrontend.java    |   2 +-
 .../flink/client/cli/CliFrontendParser.java     |   8 ++
 .../apache/flink/client/cli/ProgramOptions.java |  13 +++
 .../flink/client/CliFrontendLoggingTest.java    | 113 +++++++++++++++++++
 .../apache/flink/client/testjar/WordCount.java  |   5 +-
 6 files changed, 141 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 1f550f3..c0ae9b1 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -58,6 +58,10 @@ The command line can be used to
         ./bin/flink run -p 16 ./examples/flink-java-examples-{{ site.version }}-WordCount.jar
\
                                 file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
+-   Run example program with flink log output disabled
+
+            ./bin/flink run -q ./examples/flink-java-examples-{{ site.version }}-WordCount.jar
+
 -   Run example program on a specific JobManager:
 
         ./bin/flink run -m myJMHost:6123 \

http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 71b78bd..9ef2d5f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -276,6 +276,7 @@ public class CliFrontend {
 			LOG.debug("User parallelism is set to {}", userParallelism);
 
 			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(),
userParallelism);
+			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 			if(client.getMaxSlots() != -1 && userParallelism == -1) {
 				logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+").
" +
@@ -604,7 +605,6 @@ public class CliFrontend {
 		LOG.info("Starting execution of program");
 		JobSubmissionResult execResult;
 		try {
-			client.setPrintStatusDuringExecution(true);
 			execResult = client.run(program, parallelism, wait);
 		}
 		catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 2e66a97..ae8499b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -51,6 +51,10 @@ public class CliFrontendParser {
 			"The parallelism with which to run the program. Optional flag to override the default
value " +
 					"specified in the configuration.");
 
+	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "Whether sysout"
+
+			" " +
+			"logging is required or not");
+
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
@@ -81,6 +85,8 @@ public class CliFrontendParser {
 		PARALLELISM_OPTION.setRequired(false);
 		PARALLELISM_OPTION.setArgName("parallelism");
 
+		LOGGING_OPTION.setRequired(false);
+
 		ARGS_OPTION.setRequired(false);
 		ARGS_OPTION.setArgName("programArgs");
 		ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
@@ -107,6 +113,7 @@ public class CliFrontendParser {
 		options.addOption(CLASS_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
+		options.addOption(LOGGING_OPTION);
 
 		// also add the YARN options so that the parser can parse them
 		yarnSessionCLi.getYARNSessionCLIOptions(options);
@@ -116,6 +123,7 @@ public class CliFrontendParser {
 	private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options)
{
 		options.addOption(CLASS_OPTION);
 		options.addOption(PARALLELISM_OPTION);
+		options.addOption(LOGGING_OPTION);
 		return options;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 5b24a41..c45da13 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -25,6 +25,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 
 /**
  * Base class for command line options that refer to a JAR file program.
@@ -39,6 +40,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final int parallelism;
 
+	private final boolean stdoutLogging;
+
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
@@ -77,6 +80,12 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		else {
 			parallelism = -1;
 		}
+
+		if(line.hasOption(LOGGING_OPTION.getOpt())){
+			stdoutLogging = false;
+		} else{
+			stdoutLogging = true;
+		}
 	}
 
 	public String getJarFilePath() {
@@ -94,4 +103,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	public int getParallelism() {
 		return parallelism;
 	}
+
+	public boolean getStdoutLogging() {
+		return stdoutLogging;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java
new file mode 100644
index 0000000..157e070
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+	private static LocalFlinkMiniCluster cluster;
+	private static Configuration config;
+	private static String hostPort;
+	private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+	private CliFrontend cli;
+	private PrintStream output;
+
+	@Before
+	public void setUp() throws Exception {
+		stream.reset();
+		output = System.out;
+		System.setOut(new PrintStream(stream));
+
+		config = new Configuration();
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+		config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false);
+		hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ":" +
+				config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+		try {
+			cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Setup of test actor system failed.");
+		}
+
+		cli = new CliFrontend(getConfigDir());
+	}
+
+	@After
+	public void shutDownActorSystem() {
+		System.setOut(output);
+		if(cluster != null){
+			cluster.shutdown();
+		}
+	}
+
+	@Test
+	public void verifyLogging(){
+		try {
+			int ret = cli.run(new String[]{"-m", hostPort, getTestJarPath()});
+			System.out.flush();
+			assert(ret == 0 && checkForLogs(stream.toString()));
+		} catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		} finally {
+			if(cluster != null){
+				cluster.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void verifyNoLogging(){
+		try {
+			int ret = cli.run(new String[]{"-q", "-m", hostPort, getTestJarPath()});
+			System.out.flush();
+			assert(ret == 0 && !checkForLogs(stream.toString()));
+		} catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		} finally {
+			if(cluster != null){
+				cluster.shutdown();
+			}
+		}
+	}
+
+	private boolean checkForLogs(String output){
+		return output.indexOf("RUNNING") >= 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce622aa9/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 827bc77..b4ff616 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -57,12 +57,11 @@ public class WordCount {
 		// emit result
 		if(fileOutput) {
 			counts.writeAsCsv(outputPath, "\n", " ");
+			// execute program
+			env.execute("WordCount Example");
 		} else {
 			counts.print();
 		}
-		
-		// execute program
-		env.execute("WordCount Example");
 	}
 	
 	// *************************************************************************


Mime
View raw message