flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/3] flink git commit: [FLINK-2021] Rework examples to use ParameterTool
Date Tue, 23 Feb 2016 18:36:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master c40cfd317 -> 0629e2560


http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index 2c0bcda..8080c46 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.socket;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
@@ -33,7 +34,7 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
  * </p>
  * <p>
  * Usage:
- * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
+ * <code>SocketTextStreamWordCount --hostname &lt;name&gt; --port &lt;n&gt; --output &lt;path&gt;</code>
  * </p>
  * <p>
  * This example shows how to:
@@ -46,18 +47,25 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
  * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
  */
 public class SocketTextStreamWordCount {
+
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		if (!params.has("hostname") || !params.has("port")) {
+			System.err.println("Usage: SocketTextStreamWordCount --hostname <name> --port <n> [--output <path>]");
 			return;
 		}
 
 		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
 
 		// get input data
-		DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
+		DataStream<String> text =
+				env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
 
 		DataStream<Tuple2<String, Integer>> counts =
 				// split up the lines in pairs (2-tuples) containing: (word,1)
@@ -66,9 +74,10 @@ public class SocketTextStreamWordCount {
 						.keyBy(0)
 						.sum(1);
 
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
 
@@ -76,30 +85,4 @@ public class SocketTextStreamWordCount {
 		env.execute("WordCount from SocketTextStream Example");
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String hostName;
-	private static int port;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		// parse input arguments
-		if (args.length == 3) {
-			fileOutput = true;
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-			outputPath = args[2];
-		} else if (args.length == 2) {
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-		} else {
-			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
-			return false;
-		}
-		return true;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index d26dc42..c47936c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.twitter;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
@@ -35,7 +36,7 @@ import java.util.StringTokenizer;
  * The input is a JSON text file with lines separated by newline characters.
  * </p>
  * <p>
- * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
+ * Usage: <code>TwitterStream [--output &lt;path&gt;] [--props &lt;path&gt;]</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link TwitterStreamData}.
  * </p>
@@ -54,15 +55,28 @@ public class TwitterStream {
 	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		if (!parseParameters(args)) {
-			return;
-		}
+
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("Usage: TwitterStream --output <path> --props <path>");
 
 		// set up the execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+
 		// get input data
-		DataStream<String> streamSource = getTextDataStream(env);
+		DataStream<String> streamSource;
+		if (params.has("props")) {
+			// read the text file from given input path
+			streamSource = env.addSource(new TwitterSource(params.get("props")));
+		} else {
+			System.out.println("Executing TwitterStream example with default props.");
+			System.out.println("Use --props to specify the path to the authentication info.");
+			// get default test text data
+			streamSource = env.fromElements(TwitterStreamData.TEXTS);
+		}
 
 		DataStream<Tuple2<String, Integer>> tweets = streamSource
 				// selecting English tweets and splitting to (word, 1)
@@ -71,9 +85,10 @@ public class TwitterStream {
 				.keyBy(0).sum(1);
 
 		// emit result
-		if (fileOutput) {
-			tweets.writeAsText(outputPath);
+		if (params.has("output")) {
+			tweets.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			tweets.print();
 		}
 
@@ -121,44 +136,4 @@ public class TwitterStream {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInput = false;
-	private static boolean fileOutput = false;
-	private static String propertiesPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				fileInput = true;
-				propertiesPath = args[0];
-				outputPath = args[1];
-			} else if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing TwitterStream example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileInput) {
-			// read the text file from given input path
-			return env.addSource(new TwitterSource(propertiesPath));
-		} else {
-			// get default test text data
-			return env.fromElements(TwitterStreamData.TEXTS);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index e2df160..50c31a3 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.examples.windowing;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -38,14 +39,16 @@ public class SessionWindowing {
 	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
-
+		final ParameterTool params = ParameterTool.fromArgs(args);
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		System.out.println("Usage: SessionWindowing --output <path>");
+
+		env.getConfig().setGlobalJobParameters(params);
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(1);
 
+		final boolean fileOutput = params.has("output");
+
 		final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
 
 		input.add(new Tuple3<>("a", 1L, 1));
@@ -88,8 +91,9 @@ public class SessionWindowing {
 				.sum(2);
 
 		if (fileOutput) {
-			aggregated.writeAsText(outputPath);
+			aggregated.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			aggregated.print();
 		}
 
@@ -161,26 +165,4 @@ public class SessionWindowing {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 1) {
-				fileOutput = true;
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: SessionWindowing <result path>");
-				return false;
-			}
-		}
-		return true;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 9104416..595286b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,21 +52,21 @@ public class TopSpeedWindowing {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.err.println("Usage: TopSpeedWindowingExample --input <path> --output <path>");
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setGlobalJobParameters(params);
 
 		@SuppressWarnings({"rawtypes", "serial"})
 		DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
-
-		if (fileInput) {
-			carData = env.readTextFile(inputPath).map(new ParseCarData());
+		if (params.has("input")) {
+			carData = env.readTextFile(params.get("input")).map(new ParseCarData());
 		} else {
-			int numOfCars = 2;
-			carData = env.addSource(CarSource.create(numOfCars));
+			System.out.println("Executing TopSpeedWindowing example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			carData = env.addSource(CarSource.create(2));
 		}
 
 		int evictionSec = 10;
@@ -88,10 +89,10 @@ public class TopSpeedWindowing {
 						}, carData.getType().createSerializer(env.getConfig())))
 				.maxBy(1);
 
-		if (fileOutput) {
-			topSpeeds.print();
-			topSpeeds.writeAsText(outputPath);
+		if (params.has("output")) {
+			topSpeeds.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			topSpeeds.print();
 		}
 
@@ -170,28 +171,4 @@ public class TopSpeedWindowing {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInput = false;
-	private static boolean fileOutput = false;
-	private static String inputPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if (args.length == 2) {
-				fileInput = true;
-				fileOutput = true;
-				inputPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>");
-				return false;
-			}
-		}
-		return true;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index f3d57bf..d6f7450 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,7 +31,7 @@ import org.apache.flink.streaming.examples.wordcount.WordCount;
  * The input is a plain text file with lines separated by newline characters.
  * 
  * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt; --window &lt;n&gt; --slide &lt;n&gt;</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
  *
@@ -45,25 +46,35 @@ import org.apache.flink.streaming.examples.wordcount.WordCount;
  */
 public class WindowWordCount {
 
-	// window parameters with default values
-	private static int windowSize = 250;
-	private static int slideSize = 150;
-
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("  Usage: WindowWordCount --input <path> --output <path> --window <n> --slide <n>");
 
 		// set up the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// get input data
-		DataStream<String> text = getTextDataStream(env);
+		DataStream<String> text;
+		if (params.has("input")) {
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			System.out.println("Executing WindowWordCount example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			// get default test text data
+			text = env.fromElements(WordCountData.WORDS);
+		}
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+
+		final int windowSize = params.getInt("window", 250);
+		final int slideSize = params.getInt("slide", 150);
 
 		DataStream<Tuple2<String, Integer>> counts =
 		// split up the lines in pairs (2-tuples) containing: (word,1)
@@ -75,9 +86,10 @@ public class WindowWordCount {
 				.sum(1);
 
 		// emit result
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
 
@@ -85,48 +97,4 @@ public class WindowWordCount {
 		env.execute("WindowWordCount");
 	}
 
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length >= 2 && args.length <= 4) {
-				textPath = args[0];
-				outputPath = args[1];
-				if (args.length >= 3){
-					windowSize = Integer.parseInt(args[2]);
-
-					// if no slide size is specified use the
-					slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]);
-				}
-			} else {
-				System.err.println("Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WindowWordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
index 9b0b63c..d023246 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -29,7 +30,7 @@ import org.apache.flink.util.Collector;
  * type, but a custom class.
  * 
  * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link WordCountData}.
  * 
@@ -49,15 +50,27 @@ public class PojoExample {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("Usage: PojoExample --input <path> --output <path>");
 
 		// set up the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+
 		// get input data
-		DataStream<String> text = getTextDataStream(env);
+		DataStream<String> text;
+		if (params.has("input")) {
+			System.out.println("Executing WordCountPojo example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			// get default test text data
+			text = env.fromElements(WordCountData.WORDS);
+		}
 
 		DataStream<Word> counts =
 		// split up the lines into Word objects
@@ -65,9 +78,10 @@ public class PojoExample {
 		// group by the field word and sum up the frequency
 				.keyBy("word").sum("frequency");
 
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
 
@@ -146,41 +160,4 @@ public class PojoExample {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: PojoExample <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing PojoExample example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: PojoExample <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index a76671e..a143fa5 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,7 +33,7 @@ import org.apache.flink.util.Collector;
  * The input is a plain text file with lines separated by newline characters.
  * 
  * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from
  * {@link WordCountData}.
  * 
@@ -53,15 +54,27 @@ public class WordCount {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("Usage: WordCount --input <path> --output <path>");
 
 		// set up the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+
 		// get input data
-		DataStream<String> text = getTextDataStream(env);
+		DataStream<String> text;
+		if (params.has("input")) {
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			System.out.println("Executing WordCount example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			// get default test text data
+			text = env.fromElements(WordCountData.WORDS);
+		}
 
 		DataStream<Tuple2<String, Integer>> counts =
 		// split up the lines in pairs (2-tuples) containing: (word,1)
@@ -70,9 +83,10 @@ public class WordCount {
 				.keyBy(0).sum(1);
 
 		// emit result
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
 
@@ -108,41 +122,4 @@ public class WordCount {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 42484e8..fcc763d 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.join
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
@@ -41,16 +42,16 @@ object WindowJoin {
 
   def main(args: Array[String]) {
 
-    if (!parseParameters(args)) {
-      return
-    }
+    val params = ParameterTool.fromArgs(args)
+    println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>")
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.getConfig.setGlobalJobParameters(params)
 
-    //Create streams for grades and salaries by mapping the inputs to the corresponding objects
-    val grades = setGradesInput(env)
-    val salaries = setSalariesInput(env)
+    // Create streams for grades and salaries by mapping the inputs to the corresponding objects
+    val grades = setGradesDataStream(env, params)
+    val salaries = setSalariesDataStream(env, params)
 
     //Join the two input streams by name on the last 2 seconds every second and create new
     //Person objects containing both grade and salary
@@ -60,9 +61,10 @@ object WindowJoin {
         .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
         .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
-    if (fileOutput) {
-      joined.writeAsText(outputPath)
+    if (params.has("output")) {
+      joined.writeAsText(params.get("output"))
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       joined.print()
     }
 
@@ -105,51 +107,24 @@ object WindowJoin {
   // UTIL METHODS
   // *************************************************************************
 
-  private var fileInput: Boolean = false
-  private var fileOutput: Boolean = false
-
-  private var gradesPath: String = null
-  private var salariesPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 1) {
-        fileOutput = true
-        outputPath = args(0)
-      }
-      else if (args.length == 3) {
-        fileInput = true
-        fileOutput = true
-        gradesPath = args(0)
-        salariesPath = args(1)
-        outputPath = args(2)
-      } else {
-        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> " +
-          "<input path 2> <result path>")
-        return false
-      }
-    } else {
-      System.out.println("Executing WindowJoin with generated data.")
-      System.out.println("  Provide parameter to write to file.")
-      System.out.println("  Usage: WindowJoin <result path>")
-    }
-    true
-  }
-
-  private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = {
-    if (fileInput) {
-      env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
+  private def setGradesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
+                       DataStream[Grade] = {
+    if (params.has("grades")) {
+      env.readTextFile(params.get("grades")).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
     } else {
+      println("Executing WindowJoin example with default grades data set.")
+      println("Use --grades to specify file input.")
       env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
     }
   }
 
-  private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = {
-    if (fileInput) {
-      env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
-    }
-    else {
+  private def setSalariesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
+                         DataStream[Salary] = {
+    if (params.has("salaries")) {
+      env.readTextFile(params.get("salaries")).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
+    } else {
+      println("Executing WindowJoin example with default salaries data set.")
+      println("Use --salaries to specify file input.")
       env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
index b319a03..6b7499c 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -18,10 +18,9 @@
 
 package org.apache.flink.streaming.scala.examples.socket
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala._
 
-import scala.language.postfixOps
-
 /**
  * This example shows an implementation of WordCount with data from a text socket. 
  * To run the example make sure that the service providing the text data is already up and running.
@@ -35,7 +34,7 @@ import scala.language.postfixOps
  *
  * Usage:
  * {{{
- *   SocketTextStreamWordCount <hostname> <port> <output path>
+ *   SocketTextStreamWordCount --hostname <name> --port <n> [--output <path>]
  * }}}
  *
  * This example shows how to:
@@ -47,47 +46,30 @@ import scala.language.postfixOps
 object SocketTextStreamWordCount {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
+
+    val params = ParameterTool.fromArgs(args)
+    if (!params.has("hostname") || !params.has("port")) {
+      println("Usage: SocketTextStreamWordCount --hostname <name> --port <n> --output <path>")
       return
     }
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(params)
 
     //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.socketTextStream(hostName, port)
+    val text = env.socketTextStream(params.get("hostname"), params.getInt("port"))
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
       .keyBy(0)
       .sum(1)
 
-    if (fileOutput) {
-      counts.writeAsText(outputPath)
+    if (params.has("output")) {
+      counts.writeAsText(params.get("output"))
     } else {
-      counts print
+      println("Printing result to stdout. Use --output to specify output path.")
+      counts.print
     }
 
     env.execute("Scala SocketTextStreamWordCount Example")
   }
 
-  private def parseParameters(args: Array[String]): Boolean = {
-      if (args.length == 3) {
-        fileOutput = true
-        hostName = args(0)
-        port = args(1).toInt
-        outputPath = args(2)
-      } else if (args.length == 2) {
-        hostName = args(0)
-        port = args(1).toInt
-      } else {
-        System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]")
-        return false
-      }
-    true
-  }
-
-  private var fileOutput: Boolean = false
-  private var hostName: String = null
-  private var port: Int = 0
-  private var outputPath: String = null
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index c30e654..3a75a49 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.scala.examples.windowing
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.scala._
@@ -55,15 +56,25 @@ object TopSpeedWindowing {
   val triggerMeters = 50d
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
+
+    val params = ParameterTool.fromArgs(args)
+    println("Usage: TopSpeedWindowing --input <path> --output <path>")
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.setGlobalJobParameters(params)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
 
-    val cars = setCarsInput(env)
+    val cars =
+      if (params.has("input")) {
+        env.readTextFile(params.get("input"))
+          .map(parseMap(_))
+          .map(x => CarEvent(x._1, x._2, x._3, x._4))
+      } else {
+        println("Executing TopSpeedWindowing example with default inputs data set.")
+        println("Use --input to specify file input.")
+        env.fromCollection(genCarStream())
+      }
 
     val topSeed = cars
       .assignAscendingTimestamps( _.time )
@@ -78,10 +89,11 @@ object TopSpeedWindowing {
 //          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
       .maxBy("speed")
 
-    if (fileOutput) {
-      topSeed.writeAsText(outputPath)
+    if (params.has("output")) {
+      topSeed.writeAsText(params.get("output"))
     } else {
-      topSeed.print
+      println("Printing result to stdout. Use --output to specify output path.")
+      topSeed.print()
     }
 
     env.execute("TopSpeedWindowing")
@@ -113,38 +125,4 @@ object TopSpeedWindowing {
     (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
   }
 
-  // *************************************************************************
-  // UTIL METHODS
-  // *************************************************************************
-
-  var fileInput = false
-  var fileOutput = false
-  var inputPath : String = null
-  var outputPath : String = null
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 2) {
-        fileInput = true
-        fileOutput = true
-        inputPath = args(0)
-        outputPath = args(1)
-        true
-      } else {
-        System.err.println("Usage: TopSpeedWindowing <input path> <output path>")
-        false
-      }
-    } else {
-      true
-    }
-  }
-
-  private def setCarsInput(env: StreamExecutionEnvironment) : DataStream[CarEvent] = {
-    if (fileInput) {
-      env.readTextFile(inputPath).map(parseMap(_)).map(x => CarEvent(x._1, x._2, x._3, x._4))
-    } else {
-      env.fromCollection(genCarStream())
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
index d6e3538..81eaa2f 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
@@ -42,6 +42,8 @@ public class IterateExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		IterateExample.main(new String[]{inputPath, resultPath});
+		IterateExample.main(new String[]{
+				"--input", inputPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
index e657b67..285c9e2 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -45,6 +45,9 @@ public class WindowJoinITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+		WindowJoin.main(new String[]{
+				"--grades", gradesPath,
+				"--salaries", salariesPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
index 83569dc..7097a10 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
@@ -37,6 +37,6 @@ public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase
 
 	@Override
 	protected void testProgram() throws Exception {
-		IncrementalLearningSkeleton.main(new String[]{resultPath});
+		IncrementalLearningSkeleton.main(new String[]{"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
index 838834b..38696ca 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -24,7 +24,10 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+		SocketTextStreamWordCount.main(new String[]{
+				"--hostname", HOST,
+				"--port", port.toString(),
+				"--output", resultPath});
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
index 7850082..a57adb8 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
@@ -36,7 +36,7 @@ public class TwitterStreamITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		TwitterStream.main(new String[]{resultPath});
+		TwitterStream.main(new String[]{"--output", resultPath});
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
index 7f46be9..28255dc 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
@@ -37,6 +37,6 @@ public class SessionWindowingITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		SessionWindowing.main(new String[]{resultPath});
+		SessionWindowing.main(new String[]{"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
index 37812c9..f15ab13 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -40,6 +40,8 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+		TopSpeedWindowing.main(new String[]{
+				"--input", textPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
index e7cce60..ef5cf1c 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
@@ -45,6 +45,10 @@ public class WindowWordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize});
+		WindowWordCount.main(new String[]{
+				"--input", textPath,
+				"--output", resultPath,
+				"--window", windowSize,
+				"--slide", slideSize});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
index 6e3c213..3c69670 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
@@ -40,6 +40,8 @@ public class PojoExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		PojoExample.main(new String[]{textPath, resultPath});
+		PojoExample.main(new String[]{
+				"--input", textPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
index fcf568e..153a055 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
@@ -40,6 +40,8 @@ public class WordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WordCount.main(new String[]{textPath, resultPath});
+		WordCount.main(new String[]{
+				"--input", textPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
index 08ce890..53c5919 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
@@ -45,6 +45,9 @@ public class WindowJoinITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+		WindowJoin.main(new String[]{
+				"--grades", gradesPath,
+				"--salaries", salariesPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
index b3629ad..39345af 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -24,7 +24,10 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+		SocketTextStreamWordCount.main(new String[]{
+				"--hostname", HOST,
+				"--port", port.toString(),
+				"--output", resultPath});
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
index ef4e47f..de4fd1b 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -39,7 +39,9 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+		TopSpeedWindowing.main(new String[]{
+				"--input", textPath,
+				"--output", resultPath});
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index bc800a5..fd959af 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -102,7 +102,9 @@ public class HDFSTest {
 			
 			DopOneTestEnvironment.setAsContext();
 			try {
-				WordCount.main(new String[]{file.toString(), result.toString()});
+				WordCount.main(new String[]{
+						"--input", file.toString(),
+						"--output", result.toString()});
 			}
 			catch(Throwable t) {
 				t.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/ConnectedComponentsITCase.java
index 25e195e..d5b6b38 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/ConnectedComponentsITCase.java
@@ -47,7 +47,11 @@ public class ConnectedComponentsITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void testProgram() throws Exception {
-		ConnectedComponents.main(verticesPath, edgesPath, resultPath, "100");
+		ConnectedComponents.main(
+				"--vertices", verticesPath,
+				"--edges", edgesPath,
+				"--output", resultPath,
+				"--iterations", "100");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
index 9c4a28d..4d61b37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
@@ -40,7 +40,9 @@ public class EnumTriangleBasicITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void testProgram() throws Exception {
-		EnumTriangles.main(new String[] { edgePath, resultPath });
+		EnumTriangles.main(new String[] {
+				"--edges", edgePath,
+				"--output", resultPath });
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
index e3b722a..efd534c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/PageRankITCase.java
@@ -69,14 +69,24 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testPageRankSmallNumberOfIterations() throws Exception{
-		PageRank.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES + "", "3"});
+	public void testPageRankSmallNumberOfIterations() throws Exception {
+		PageRank.main(new String[]{
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES + "",
+				"--iterations", "3"});
 		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
 	}
 
 	@Test
 	public void testPageRankWithConvergenceCriterion() throws Exception {
-		PageRank.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES + "", "1000"});
+		PageRank.main(new String[]{
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES + "",
+				"--vertices", "1000"});
 		expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/TransitiveClosureITCase.java
index dce4c49..123f616 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/TransitiveClosureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -47,7 +47,10 @@ public class TransitiveClosureITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		TransitiveClosureNaive.main(edgesPath, resultPath, "5");
+		TransitiveClosureNaive.main(
+				"--edges", edgesPath,
+				"--output", resultPath,
+				"--iterations", "5");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WebLogAnalysisITCase.java
index d67b0e0..b5ce8e4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WebLogAnalysisITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WebLogAnalysisITCase.java
@@ -44,6 +44,10 @@ public class WebLogAnalysisITCase extends JavaProgramTestBase {
 	}
 	@Override
 	protected void testProgram() throws Exception {
-		WebLogAnalysis.main(new String[] {docsPath, ranksPath, visitsPath, resultPath});
+		WebLogAnalysis.main(new String[] {
+				"--documents", docsPath,
+				"--ranks", ranksPath,
+				"--visits", visitsPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index 01f1b00..3c47c5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -40,6 +40,8 @@ public class WordCountITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
+		WordCount.main(new String[] {
+				"--input", textPath,
+				"--output", resultPath });
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
index ef1eca0..f3a1d03 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
@@ -48,7 +48,11 @@ public class ConnectedComponentsITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void testProgram() throws Exception {
-		ConnectedComponents.main(new String[] {verticesPath, edgesPath, resultPath, "100"});
+		ConnectedComponents.main(new String[] {
+				"--vertices", verticesPath,
+				"--edges", edgesPath,
+				"--output", resultPath,
+				"--iterations", "100"});
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
index e23cccf..9d5ffdf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
@@ -40,7 +40,9 @@ public class EnumTriangleITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void testProgram() throws Exception {
-		EnumTriangles.main(new String[] { edgePath, resultPath });
+		EnumTriangles.main(new String[] {
+				"--edges", edgePath,
+				"--output", resultPath });
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
index 6b9e550..e3e8936 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -72,14 +72,24 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testPageRankWithSmallNumberOfIterations() throws Exception {
-		PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
+		PageRankBasic.main(new String[] {
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES+"",
+				"--iterations", "3"});
 		expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
 	}
 
 	@Test
 	public void testPageRankWithConvergence() throws Exception {
 		// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
-		PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+		PageRankBasic.main(new String[] {
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES+"",
+				"--iterations", "1000"});
 		expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
index 09a4072..9804f4c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
@@ -47,7 +47,10 @@ public class TransitiveClosureITCase extends JavaProgramTestBase {
 
     @Override
     protected void testProgram() throws Exception {
-        TransitiveClosureNaive.main(new String [] {edgesPath, resultPath, "5"});
+        TransitiveClosureNaive.main(new String [] {
+                "--edges", edgesPath,
+                "--output", resultPath,
+                "--iterations", "5"});
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
index d6b4372..b952bc2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
@@ -44,6 +44,10 @@ public class WebLogAnalysisITCase extends JavaProgramTestBase {
 	}
 	@Override
 	protected void testProgram() throws Exception {
-		WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath});
+		WebLogAnalysis.main(new String[]{
+				"--documents", docsPath,
+				"--ranks", ranksPath,
+				"--visits", visitsPath,
+				"--output", resultPath});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
index 5c1c500..ce55910 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
@@ -46,6 +46,8 @@ public class WordCountITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
+		WordCount.main(new String[] {
+				"--input", textPath,
+				"--output", resultPath });
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index dad2f12..9f9d42b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -48,7 +48,9 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			WordCount.main(new String[] {IN_FILE, OUT_FILE});
+			WordCount.main(new String[] {
+					"--input", IN_FILE,
+					"--output", OUT_FILE});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -64,7 +66,11 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			TPCHQuery3.main(new String[] {
+					"--lineitem", IN_FILE,
+					"--customer", IN_FILE,
+					"--orders", OUT_FILE,
+					"--output", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -80,7 +86,11 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			KMeans.main(new String[] {
+				"--points ", IN_FILE,
+				"--centroids ", IN_FILE,
+				"--output ", OUT_FILE,
+				"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -96,7 +106,11 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			WebLogAnalysis.main(new String[] {
+					"--documents", IN_FILE,
+					"--ranks", IN_FILE,
+					"--visits", OUT_FILE,
+					"--output", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -112,7 +126,11 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			ConnectedComponents.main(new String[] {
+					"--vertices", IN_FILE,
+					"--edges", IN_FILE,
+					"--output", OUT_FILE,
+					"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -128,12 +146,17 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			PageRank.main(new String[]{IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+			PageRank.main(new String[]{
+					"--pages", IN_FILE,
+					"--links", IN_FILE,
+					"--output", OUT_FILE,
+					"--numPages", "10",
+					"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("PagaRank failed with an exception");
+			Assert.fail("PageRank failed with an exception");
 		}
 		dump(env.getPlan());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index a9ade6a..0c14d82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -103,7 +103,9 @@ public class JsonJobGraphGenerationTest {
 				TestingExecutionEnvironment.setAsNext(validator, parallelism);
 				
 				String tmpDir = ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH;
-				WordCount.main(new String[] {tmpDir, tmpDir});
+				WordCount.main(new String[] {
+						"--input", tmpDir,
+						"--output", tmpDir});
 			}
 			catch (AbortError ignored) {}
 		}
@@ -134,7 +136,11 @@ public class JsonJobGraphGenerationTest {
 				TestingExecutionEnvironment.setAsNext(validator, parallelism);
 
 				String tmpDir = ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH;
-				WebLogAnalysis.main(new String[] {tmpDir, tmpDir, tmpDir, tmpDir});
+				WebLogAnalysis.main(new String[] {
+						"--documents", tmpDir,
+						"--ranks", tmpDir,
+						"--visits", tmpDir,
+						"--output", tmpDir});
 			}
 			catch (AbortError ignored) {}
 		}
@@ -165,7 +171,11 @@ public class JsonJobGraphGenerationTest {
 				TestingExecutionEnvironment.setAsNext(validator, parallelism);
 
 				String tmpDir = ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH;
-				KMeans.main(new String[] {tmpDir, tmpDir, tmpDir, "100"});
+				KMeans.main(new String[] {
+					"--points", tmpDir,
+					"--centroids", tmpDir,
+					"--output", tmpDir,
+					"--iterations", "100"});
 			}
 			catch (AbortError ignored) {}
 
@@ -197,7 +207,11 @@ public class JsonJobGraphGenerationTest {
 				TestingExecutionEnvironment.setAsNext(validator, parallelism);
 
 				String tmpDir = ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH;
-				ConnectedComponents.main(tmpDir, tmpDir, tmpDir, "100");
+				ConnectedComponents.main(
+						"--vertices", tmpDir,
+						"--edges", tmpDir,
+						"--output", tmpDir,
+						"--iterations", "100");
 			}
 			catch (AbortError ignored) {}
 			
@@ -213,7 +227,7 @@ public class JsonJobGraphGenerationTest {
 	
 	private static interface JsonValidator {
 		
-		void validtateJson(String json) throws Exception;
+		void validateJson(String json) throws Exception;
 	}
 	
 	private static class GenericValidator implements JsonValidator {
@@ -227,7 +241,7 @@ public class JsonJobGraphGenerationTest {
 		}
 
 		@Override
-		public void validtateJson(String json) throws Exception {
+		public void validateJson(String json) throws Exception {
 			final Map<String, JsonNode> idToNode = new HashMap<>();
 			
 			// validate the produced JSON
@@ -330,7 +344,7 @@ public class JsonJobGraphGenerationTest {
 			JsonParser parser = new JsonFactory().createJsonParser(jsonPlan);
 			while (parser.nextToken() != null);
 			
-			validator.validtateJson(jsonPlan);
+			validator.validateJson(jsonPlan);
 			
 			throw new AbortError();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 57c82b5..6f02ab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -49,7 +49,9 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			WordCount.main(new String[] {IN_FILE, OUT_FILE});
+			WordCount.main(new String[] {
+					"--input", IN_FILE,
+					"--output", OUT_FILE});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -65,7 +67,11 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			TPCHQuery3.main(new String[] {
+					"--lineitem", IN_FILE,
+					"--customer", IN_FILE,
+					"--orders", OUT_FILE,
+					"--output", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -81,7 +87,11 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			KMeans.main(new String[] {
+				"--points ", IN_FILE,
+				"--centroids ", IN_FILE,
+				"--output ", OUT_FILE,
+				"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -97,7 +107,11 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {
+					"--documents", IN_FILE,
+					"--ranks", IN_FILE,
+					"--visits", OUT_FILE,
+					"--output", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -113,7 +127,11 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+			ConnectedComponents.main(new String[] {
+					"--vertices", IN_FILE,
+					"--edges", IN_FILE,
+					"--output", OUT_FILE,
+					"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
@@ -129,12 +147,18 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			PageRank.main(new String[]{IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+			// --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
+			PageRank.main(new String[]{
+					"--pages", IN_FILE,
+					"--links", IN_FILE,
+					"--output", OUT_FILE,
+					"--numPages", "10",
+					"--iterations", "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("PagaRank failed with an exception");
+			Assert.fail("PageRank failed with an exception");
 		}
 		dump(env.getPlan());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ca3a38b..17224b9 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -425,7 +425,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
 				"-ytm", "1024",
 				"-ys", "2", // test requesting slots from YARN.
-				"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
+				"--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()},
 			"Job has been submitted with JobID",
 			RunTypes.CLI_FRONTEND);
 


Mime
View raw message