flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/3] flink git commit: [FLINK-2021] Rework examples to use ParameterTool
Date Tue, 23 Feb 2016 18:36:02 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index d31ee55..677ca1c 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.relational.util.WebLogData;
 import org.apache.flink.util.Collector;
 
@@ -77,7 +78,7 @@ import org.apache.flink.util.Collector;
  * }</pre>
  * 
  * <p>
- * Usage: <code>WebLogAnalysis &lt;documents path&gt; &lt;ranks path&gt; &lt;visits path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WebLogAnalysis --documents &lt;path&gt; --ranks &lt;path&gt; --visits &lt;path&gt; --result &lt;path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link WebLogData}.
  * 
  * <p>
@@ -98,16 +99,17 @@ public class WebLogAnalysis {
 	
 	public static void main(String[] args) throws Exception {
 		
-		if(!parseParameters(args)) {
-			return;
-		}
+		final ParameterTool params = ParameterTool.fromArgs(args);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		System.out.println("Usage: WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>");
+
+		env.getConfig().setGlobalJobParameters(params);
 
 		// get input data
-		DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env);
-		DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env);
-		DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env);
+		DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env, params);
+		DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env, params);
+		DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env, params);
 		
 		// Retain documents with keywords
 		DataSet<Tuple1<String>> filterDocs = documents
@@ -136,11 +138,12 @@ public class WebLogAnalysis {
 								.with(new AntiJoinVisits());
 
 		// emit result
-		if(fileOutput) {
-			result.writeAsCsv(outputPath, "\n", "|");
+		if(params.has("output")) {
+			result.writeAsCsv(params.get("output"), "\n", "|");
 			// execute program
 			env.execute("WebLogAnalysis Example");
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			result.print();
 		}
 	}
@@ -259,65 +262,42 @@ public class WebLogAnalysis {
 	//     UTIL METHODS
 	// *************************************************************************
 	
-	private static boolean fileOutput = false;
-	private static String documentsPath;
-	private static String ranksPath;
-	private static String visitsPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			fileOutput = true;
-			if(args.length == 4) {
-				documentsPath = args[0];
-				ranksPath = args[1];
-				visitsPath = args[2];
-				outputPath = args[3];
-			} else {
-				System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WebLog Analysis example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  We provide a data generator to create synthetic input files for this program.");
-			System.out.println("  Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for documents relation (URL, Doc-Text)
-		if(fileOutput) {
-			return env.readCsvFile(documentsPath)
+		if(params.has("documents")) {
+			return env.readCsvFile(params.get("documents"))
 						.fieldDelimiter("|")
 						.types(String.class, String.class);
 		} else {
+			System.out.println("Executing WebLogAnalysis example with default documents data set.");
+			System.out.println("Use --documents to specify file input.");
 			return WebLogData.getDocumentDataSet(env);
 		}
 	}
 	
-	private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
-		if(fileOutput) {
-			return env.readCsvFile(ranksPath)
+		if(params.has("ranks")) {
+			return env.readCsvFile(params.get("ranks"))
 						.fieldDelimiter("|")
 						.types(Integer.class, String.class, Integer.class);
 		} else {
+			System.out.println("Executing WebLogAnalysis example with default ranks data set.");
+			System.out.println("Use --ranks to specify file input.");
 			return WebLogData.getRankDataSet(env);
 		}
 	}
 
-	private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env) {
+	private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for visits relation (URL, Date)
-		if(fileOutput) {
-			return env.readCsvFile(visitsPath)
+		if(params.has("visits")) {
+			return env.readCsvFile(params.get("visits"))
 						.fieldDelimiter("|")
 						.includeFields("011000000")
 						.types(String.class, String.class);
 		} else {
+			System.out.println("Executing WebLogAnalysis example with default visits data set.");
+			System.out.println("Use --visits to specify file input.");
 			return WebLogData.getVisitDataSet(env);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 3d88a6e..b914d77 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
 
@@ -33,7 +34,7 @@ import org.apache.flink.util.Collector;
  * The input is a plain text file with lines separated by newline characters.
  * 
  * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link WordCountData}.
  * 
  * <p>
@@ -54,15 +55,26 @@ public class WordCount {
 	
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
-		
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
+		System.out.println("Usage: WordCount --input <path> --output <path>");
+
 		// get input data
-		DataSet<String> text = getTextDataSet(env);
+		DataSet<String> text;
+		if (params.has("input")) {
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			// get default test text data
+			System.out.println("Executing WordCount example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			text = WordCountData.getDefaultTextLineDataSet(env);
+		}
 
 		DataSet<Tuple2<String, Integer>> counts = 
 				// split up the lines in pairs (2-tuples) containing: (word,1)
@@ -72,14 +84,14 @@ public class WordCount {
 				.sum(1);
 
 		// emit result
-		if (fileOutput) {
-			counts.writeAsCsv(outputPath, "\n", " ");
+		if (params.has("output")) {
+			counts.writeAsCsv(params.get("output"), "\n", " ");
 			// execute program
 			env.execute("WordCount Example");
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
-		
 
 	}
 	
@@ -107,42 +119,5 @@ public class WordCount {
 			}
 		}
 	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
index b0653ca..1ad15d8 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
@@ -76,16 +77,27 @@ public class WordCountPojo {
 	}
 	
 	public static void main(String[] args) throws Exception {
-		if (!parseParameters(args)) {
-			return;
-		}
-		
+
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
 		
 		// get input data
-		DataSet<String> text = getTextDataSet(env);
-		
+		DataSet<String> text;
+		if (params.has("input")) {
+			// read the text file from given input path
+			text = env.readTextFile(params.get("input"));
+		} else {
+			// get default test text data
+			System.out.println("Executing WordCount example with default input data set.");
+			System.out.println("Use --input to specify file input.");
+			text = WordCountData.getDefaultTextLineDataSet(env);
+		}
+
 		DataSet<Word> counts = 
 			// split up the lines into Word objects (with frequency = 1)
 			text.flatMap(new Tokenizer())
@@ -98,11 +110,12 @@ public class WordCountPojo {
 				}
 			});
 		
-		if (fileOutput) {
-			counts.writeAsText(outputPath, WriteMode.OVERWRITE);
+		if (params.has("output")) {
+			counts.writeAsText(params.get("output"), WriteMode.OVERWRITE);
 			// execute program
 			env.execute("WordCount-Pojo Example");
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			counts.print();
 		}
 
@@ -133,41 +146,4 @@ public class WordCountPojo {
 		}
 	}
 	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 08a3e62..78ce45b 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -19,6 +19,7 @@ package org.apache.flink.examples.scala.clustering
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.examples.java.clustering.util.KMeansData
@@ -35,29 +36,29 @@ import scala.collection.JavaConverters._
  * Each point is assigned to the cluster center which is closest to it.
  * Subsequently, each cluster center is moved to the center (''mean'') of all points that have
  * been assigned to it.
- * The moved cluster centers are fed into the next iteration. 
- * The algorithm terminates after a fixed number of iterations (as in this implementation) 
+ * The moved cluster centers are fed into the next iteration.
+ * The algorithm terminates after a fixed number of iterations (as in this implementation)
  * or if cluster centers do not (significantly) move in an iteration.
  * This is the Wikipedia entry for the [[http://en.wikipedia
  * .org/wiki/K-means_clustering K-Means Clustering algorithm]].
  *
  * This implementation works on two-dimensional data points.
- * It computes an assignment of data points to cluster centers, i.e., 
+ * It computes an assignment of data points to cluster centers, i.e.,
  * each data point is annotated with the id of the final cluster (center) it belongs to.
  *
  * Input files are plain text files and must be formatted as follows:
  *
- *  - Data points are represented as two double values separated by a blank character.
- *    Data points are separated by newline characters.
- *    For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3,
- *    y=7.2).
- *  - Cluster centers are represented by an integer id and a point value.
- *    For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2,
- *    y=3.2) and (id=2, x=2.9, y=5.7).
+ * - Data points are represented as two double values separated by a blank character.
+ * Data points are separated by newline characters.
+ * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3,
+ * y=7.2).
+ * - Cluster centers are represented by an integer id and a point value.
+ * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2,
+ * y=3.2) and (id=2, x=2.9, y=5.7).
  *
  * Usage:
  * {{{
- *   KMeans <points path> <centers path> <result path> <num iterations>
+ *   KMeans --points <path> --centroids <path> --output <path> --iterations <n>
  * }}}
  * If no parameters are provided, the program is run with default data from
  * [[org.apache.flink.examples.java.clustering.util.KMeansData]]
@@ -65,23 +66,27 @@ import scala.collection.JavaConverters._
  *
  * This example shows how to use:
  *
- *  - Bulk iterations
- *  - Broadcast variables in bulk iterations
- *  - Custom Java objects (PoJos)
+ * - Bulk iterations
+ * - Broadcast variables in bulk iterations
+ * - Scala case classes
  */
 object KMeans {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
+    // checking input parameters
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>")
+
+    // set up execution environment
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
 
-    val points: DataSet[Point] = getPointDataSet(env)
-    val centroids: DataSet[Centroid] = getCentroidDataSet(env)
+    // get input data:
+    // read the points and centroids from the provided paths or fall back to default data
+    val points: DataSet[Point] = getPointDataSet(params, env)
+    val centroids: DataSet[Centroid] = getCentroidDataSet(params, env)
 
-    val finalCentroids = centroids.iterate(numIterations) { currentCentroids =>
+    val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids =>
       val newCentroids = points
         .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")
         .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2")
@@ -94,136 +99,106 @@ object KMeans {
     val clusteredPoints: DataSet[(Int, Point)] =
       points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids")
 
-    if (fileOutput) {
-      clusteredPoints.writeAsCsv(outputPath, "\n", " ")
+    if (params.has("output")) {
+      clusteredPoints.writeAsCsv(params.get("output"), "\n", " ")
       env.execute("Scala KMeans Example")
-    }
-    else {
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       clusteredPoints.print()
     }
 
   }
 
-  private def parseParameters(programArguments: Array[String]): Boolean = {
-    if (programArguments.length > 0) {
-      fileOutput = true
-      if (programArguments.length == 4) {
-        pointsPath = programArguments(0)
-        centersPath = programArguments(1)
-        outputPath = programArguments(2)
-        numIterations = Integer.parseInt(programArguments(3))
-
-        true
-      }
-      else {
-        System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " +
-          "iterations>")
+  // *************************************************************************
+  //     UTIL FUNCTIONS
+  // *************************************************************************
 
-        false
-      }
-    }
-    else {
-      System.out.println("Executing K-Means example with default parameters and built-in default " +
-        "data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  We provide a data generator to create synthetic input files for this " +
-        "program.")
-      System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num " +
-        "iterations>")
-
-      true
+  def getCentroidDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Centroid] = {
+    if (params.has("centroids")) {
+      env.readCsvFile[Centroid](
+        params.get("centroids"),
+        fieldDelimiter = " ",
+        includedFields = Array(0, 1, 2))
+    } else {
+      println("Executing K-Means example with default centroid data set.")
+      println("Use --centroids to specify file input.")
+      env.fromCollection(KMeansData.CENTROIDS map {
+        case Array(id, x, y) =>
+          new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double])
+      })
     }
   }
 
-  private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = {
-    if (fileOutput) {
-      env.readCsvFile[(Double, Double)](
-        pointsPath,
+  def getPointDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Point] = {
+    if (params.has("points")) {
+      env.readCsvFile[Point](
+        params.get("points"),
         fieldDelimiter = " ",
         includedFields = Array(0, 1))
-        .map { x => new Point(x._1, x._2)}
-    }
-    else {
-      val points = KMeansData.POINTS map {
+    } else {
+      println("Executing K-Means example with default points data set.")
+      println("Use --points to specify file input.")
+      env.fromCollection(KMeansData.POINTS map {
         case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double])
-      }
-      env.fromCollection(points)
+      })
     }
   }
 
-  private def getCentroidDataSet(env: ExecutionEnvironment): DataSet[Centroid] = {
-    if (fileOutput) {
-      env.readCsvFile[(Int, Double, Double)](
-        centersPath,
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1, 2))
-        .map { x => new Centroid(x._1, x._2, x._3)}
-    }
-    else {
-      val centroids = KMeansData.CENTROIDS map {
-        case Array(id, x, y) =>
-          new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double])
-      }
-      env.fromCollection(centroids)
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var pointsPath: String = null
-  private var centersPath: String = null
-  private var outputPath: String = null
-  private var numIterations: Int = 10
+  // *************************************************************************
+  //     DATA TYPES
+  // *************************************************************************
 
   /**
-   * A simple two-dimensional point.
-   */
-  class Point(var x: Double, var y: Double) extends Serializable {
-    def this() {
-      this(0, 0)
-    }
+    * Common trait for operations supported by both points and centroids
+    * Note: case class inheritance is not allowed in Scala
+    */
+  trait Coordinate extends Serializable {
 
-    def add(other: Point): Point = {
+    var x: Double
+    var y: Double
+
+    def add(other: Coordinate): this.type = {
       x += other.x
       y += other.y
       this
     }
 
-    def div(other: Long): Point = {
+    def div(other: Long): this.type = {
       x /= other
       y /= other
       this
     }
 
-    def euclideanDistance(other: Point): Double = {
+    def euclideanDistance(other: Coordinate): Double =
       Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y))
-    }
 
     def clear(): Unit = {
       x = 0
       y = 0
     }
 
-    override def toString: String = {
-      x + " " + y
-    }
+    override def toString: String =
+      s"$x $y"
+
   }
 
   /**
-   * A simple two-dimensional centroid, basically a point with an ID.
-   */
-  class Centroid(var id: Int, x: Double, y: Double) extends Point(x, y) {
-    def this() {
-      this(0, 0, 0)
-    }
+    * A simple two-dimensional point.
+    */
+  case class Point(var x: Double = 0, var y: Double = 0) extends Coordinate
+
+  /**
+    * A simple two-dimensional centroid, basically a point with an ID.
+    */
+  case class Centroid(var id: Int = 0, var x: Double = 0, var y: Double = 0) extends Coordinate {
 
     def this(id: Int, p: Point) {
       this(id, p.x, p.y)
     }
 
-    override def toString: String = {
-      id + " " + super.toString
-    }
+    override def toString: String =
+      s"$id ${super.toString}"
+
   }
 
   /** Determines the closest cluster center for a data point. */
@@ -250,6 +225,7 @@ object KMeans {
     }
 
   }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index 3df9791..2538aa6 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.graph
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
 import org.apache.flink.util.Collector
@@ -47,7 +48,7 @@ import org.apache.flink.util.Collector
  *
  * Usage:
  * {{{
- *   ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
+ *   ConnectedComponents --vertices <path> --edges <path> --result <path> --iterations <n>
  * }}}
  *   
  * If no parameters are provided, the program is run with default data from
@@ -61,20 +62,30 @@ import org.apache.flink.util.Collector
  *   
  */
 object ConnectedComponents {
+
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
+
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: ConnectedComponents " +
+      "--vertices <path> --edges <path> --output <path> --iterations <n>")
+
     // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
+    val maxIterations: Int = params.getInt("iterations", 10)
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
     // read vertex and edge data
     // assign the initial components (equal to the vertex id)
-    val vertices = getVerticesDataSet(env).map { id => (id, id) }.withForwardedFields("*->_1;*->_2")
+    val vertices =
+      getVertexDataSet(env, params).map { id => (id, id) }.withForwardedFields("*->_1;*->_2")
 
-    // undirected edges by emitting for each input edge the input edges itself and an inverted
-    // version
-    val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
+    // undirected edges by emitting for each input edge the input
+    // edges itself and an inverted version
+    val edges =
+      getEdgeDataSet(env, params).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
 
     // open a delta iteration
     val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) {
@@ -97,62 +108,43 @@ object ConnectedComponents {
         // delta and new workset are identical
         (updatedComponents, updatedComponents)
     }
-    if (fileOutput) {
-      verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+
+    if (params.has("output")) {
+      verticesWithComponents.writeAsCsv(params.get("output"), "\n", " ")
       env.execute("Scala Connected Components Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       verticesWithComponents.print()
     }
 
   }
- 
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 4) {
-        verticesPath = args(0)
-        edgesPath = args(1)
-        outputPath = args(2)
-        maxIterations = args(3).toInt
-
-        true
-      } else {
-        System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
-          " <max number of iterations>")
-
-        false
-      }
-    } else {
-      System.out.println("Executing Connected Components example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path>" +
-        " <max number of iterations>")
-
-      true
-    }
-  }
 
-  private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
-    if (fileOutput) {
-       env.readCsvFile[Tuple1[Long]](
-        verticesPath,
+  private def getVertexDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[Long] = {
+    if (params.has("vertices")) {
+      env.readCsvFile[Tuple1[Long]](
+        params.get("vertices"),
         includedFields = Array(0))
         .map { x => x._1 }
     }
     else {
+      println("Executing ConnectedComponents example with default vertices data set.")
+      println("Use --vertices to specify file input.")
       env.fromCollection(ConnectedComponentsData.VERTICES)
     }
   }
-  
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
-    if (fileOutput) {
+
+  private def getEdgeDataSet(env: ExecutionEnvironment, params: ParameterTool):
+                     DataSet[(Long, Long)] = {
+    if (params.has("edges")) {
       env.readCsvFile[(Long, Long)](
-        edgesPath,
+        params.get("edges"),
         fieldDelimiter = " ",
         includedFields = Array(0, 1))
         .map { x => (x._1, x._2)}
     }
     else {
+      println("Executing ConnectedComponents example with default edges data set.")
+      println("Use --edges to specify file input.")
       val edgeData = ConnectedComponentsData.EDGES map {
         case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
       }
@@ -160,9 +152,4 @@ object ConnectedComponents {
     }
   }
 
-  private var fileOutput: Boolean = false
-  private var verticesPath: String = null
-  private var edgesPath: String = null
-  private var maxIterations: Int = 10
-  private var outputPath: String = null
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index 41fb307..159891b 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -58,7 +58,7 @@ object DeltaPageRank {
           val targets = adj._2
           val rankPerTarget = INITIAL_RANK * DAMPENING_FACTOR / targets.length
 
-          // dampend fraction to targets
+          // dampen fraction to targets
           for (target <- targets) {
             out.collect((target, rankPerTarget))
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
index cff53d8..88037c5 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.examples.scala.graph
 
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import scala.collection.JavaConverters._
 import org.apache.flink.api.scala.ExecutionEnvironment
@@ -68,15 +69,31 @@ import scala.collection.mutable
 object EnumTriangles {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
+
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: EnumTriangleBasic --edges <path> --output <path>")
 
     // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
     // read input data
-    val edges = getEdgeDataSet(env)
+    val edges =
+      if (params.has("edges")) {
+        env.readCsvFile[Edge](
+          filePath = params.get("edges"),
+          fieldDelimiter = " ",
+          includedFields = Array(0, 1))
+      } else {
+        println("Executing EnumTriangles example with default edges data set.")
+        println("Use --edges to specify file input.")
+        val edges = EnumTrianglesData.EDGES.map {
+          case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
+        }
+        env.fromCollection(edges)
+      }
     
     // project edges by vertex id
     val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
@@ -89,11 +106,12 @@ object EnumTriangles {
               .withForwardedFieldsFirst("*")
     
     // emit result
-    if (fileOutput) {
-      triangles.writeAsCsv(outputPath, "\n", ",")
+    if (params.has("output")) {
+      triangles.writeAsCsv(params.get("output"), "\n", ",")
       // execute program
       env.execute("TriangleEnumeration Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       triangles.print()
     }
     
@@ -139,47 +157,4 @@ object EnumTriangles {
     }
   }
 
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 2) {
-        edgePath = args(0)
-        outputPath = args(1)
-
-        true
-      } else {
-        System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
-
-        false
-      }
-    } else {
-      System.out.println("Executing Enum Triangles Basic example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
-
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
-    if (fileOutput) {
-      env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1))
-    } else {
-      val edges = EnumTrianglesData.EDGES.map {
-        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
-      }
-      env.fromCollection(edges)
-    }
-  }
-  
-  
-  private var fileOutput: Boolean = false
-  private var edgePath: String = null
-  private var outputPath: String = null
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index e1d4af6..84ebff5 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -20,6 +20,7 @@ package org.apache.flink.examples.scala.graph
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.PageRankData
 import org.apache.flink.api.java.aggregation.Aggregations.SUM
@@ -53,7 +54,7 @@ import scala.collection.JavaConverters._
  *
  * Usage:
  * {{{
- *   PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>
+ *   PageRankBasic --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
  * }}}
  *
  * If no parameters are provided, the program is run with default data from
@@ -72,16 +73,21 @@ object PageRankBasic {
   private final val EPSILON: Double = 0.0001
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
+
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: PageRankBasic " +
+      "--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>")
 
     // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
     // read input data
-    val pages = getPagesDataSet(env)
-    val links = getLinksDataSet(env)
+    val (pages, numPages) = getPagesDataSet(env, params)
+    val links = getLinksDataSet(env, params)
+    val maxIterations = params.getInt("iterations", 10)
 
     // assign initial ranks to pages
     val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId")
@@ -126,11 +132,12 @@ object PageRankBasic {
     val result = finalRanks
 
     // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", " ")
+    if (params.has("output")) {
+      result.writeAsCsv(params.get("output"), "\n", " ")
       // execute program
       env.execute("Basic PageRank Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       result.print()
     }
   }
@@ -149,62 +156,32 @@ object PageRankBasic {
   //     UTIL METHODS
   // *************************************************************************
 
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 5) {
-        pagesInputPath = args(0)
-        linksInputPath = args(1)
-        outputPath = args(2)
-        numPages = args(3).toLong
-        maxIterations = args(4).toInt
-
-        true
-      } else {
-        System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " +
-          "pages> <num iterations>")
-
-        false
-      }
-    } else {
-      System.out.println("Executing PageRank Basic example with default parameters and built-in " +
-        "default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num " +
-        "pages> <num iterations>")
-
-      numPages = PageRankData.getNumberOfPages
-
-      true
-    }
-  }
-
-  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
-    if (fileOutput) {
-      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n")
+  private def getPagesDataSet(env: ExecutionEnvironment, params: ParameterTool):
+                     (DataSet[Long], Long) = {
+    if (params.has("pages") && params.has("numPages")) {
+      val pages = env
+        .readCsvFile[Tuple1[Long]](params.get("pages"), fieldDelimiter = " ", lineDelimiter = "\n")
         .map(x => x._1)
+      (pages, params.getLong("numPages"))
     } else {
-      env.generateSequence(1, 15)
+      println("Executing PageRank example with default pages data set.")
+      println("Use --pages and --numPages to specify file input.")
+      (env.generateSequence(1, 15), PageRankData.getNumberOfPages)
     }
   }
 
-  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
-    if (fileOutput) {
-      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+  private def getLinksDataSet(env: ExecutionEnvironment, params: ParameterTool):
+                      DataSet[Link] = {
+    if (params.has("links")) {
+      env.readCsvFile[Link](params.get("links"), fieldDelimiter = " ",
         includedFields = Array(0, 1))
     } else {
+      println("Executing PageRank example with default links data set.")
+      println("Use --links to specify file input.")
       val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long],
         v2.asInstanceOf[Long])}
       env.fromCollection(edges)
     }
   }
 
-  private var fileOutput: Boolean = false
-  private var pagesInputPath: String = null
-  private var linksInputPath: String = null
-  private var outputPath: String = null
-  private var numPages: Long = 0
-  private var maxIterations: Int = 10
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index b7c0714..d989969 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.graph
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
 import org.apache.flink.util.Collector
@@ -25,13 +26,33 @@ import org.apache.flink.util.Collector
 object  TransitiveClosureNaive {
 
   def main (args: Array[String]): Unit = {
-    if (!parseParameters(args)) {
-      return
-    }
 
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: TransitiveClosure --edges <path> --output <path> --iterations <n>")
+
+    // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    val edges = getEdgesDataSet(env)
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    val edges =
+      if (params.has("edges")) {
+        env.readCsvFile[(Long, Long)](
+          filePath = params.get("edges"),
+          fieldDelimiter = " ",
+          includedFields = Array(0, 1))
+          .map { x => (x._1, x._2)}
+      } else {
+        println("Executing TransitiveClosure example with default edges data set.")
+        println("Use --edges to specify file input.")
+        val edgeData = ConnectedComponentsData.EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData)
+      }
+
+    val maxIterations = params.getInt("iterations", 10)
 
     val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] =>
 
@@ -56,61 +77,14 @@ object  TransitiveClosureNaive {
       (nextPaths, terminate)
     }
 
-    if (fileOutput) {
-      paths.writeAsCsv(outputPath, "\n", " ")
+    if (params.has("output")) {
+      paths.writeAsCsv(params.get("output"), "\n", " ")
       env.execute("Scala Transitive Closure Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       paths.print()
     }
 
-
-
-  }
-
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private var maxIterations: Int = 10
-
-  private def parseParameters(programArguments: Array[String]): Boolean = {
-    if (programArguments.length > 0) {
-      fileOutput = true
-      if (programArguments.length == 3) {
-        edgesPath = programArguments(0)
-        outputPath = programArguments(1)
-        maxIterations = Integer.parseInt(programArguments(2))
-      }
-      else {
-        System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of " +
-          "iterations>")
-        return false
-      }
-    }
-    else {
-      System.out.println("Executing TransitiveClosure example with default parameters and " +
-        "built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of " +
-        "iterations>")
-    }
-    true
   }
 
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-        .map { x => (x._1, x._2)}
-    }
-    else {
-      val edgeData = ConnectedComponentsData.EDGES map {
-        case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-      }
-      env.fromCollection(edgeData)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index 2a7b786..eec84b9 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.examples.scala.ml
 
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.examples.java.ml.util.LinearRegressionData
@@ -61,13 +62,37 @@ import scala.collection.JavaConverters._
 object LinearRegression {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
 
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: LinearRegression --input <path> --output <path> --iterations <n>")
+
+    // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val data = getDataSet(env)
-    val parameters = getParamsDataSet(env)
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    val parameters = env.fromCollection(LinearRegressionData.PARAMS map {
+      case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
+    })
+
+    val data =
+      if (params.has("input")) {
+        env.readCsvFile[(Double, Double)](
+          params.get("input"),
+          fieldDelimiter = " ",
+          includedFields = Array(0, 1))
+          .map { t => new Data(t._1, t._2) }
+      } else {
+        println("Executing LinearRegression example with default input data set.")
+        println("Use --input to specify file input.")
+        val data = LinearRegressionData.DATA map {
+          case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
+        }
+        env.fromCollection(data)
+      }
+
+    val numIterations = params.getInt("iterations", 10)
 
     val result = parameters.iterate(numIterations) { currentParameters =>
       val newParameters = data
@@ -80,11 +105,11 @@ object LinearRegression {
       newParameters
     }
 
-    if (fileOutput) {
-      result.writeAsText(outputPath)
+    if (params.has("output")) {
+      result.writeAsText(params.get("output"))
       env.execute("Scala Linear Regression example")
-    }
-    else {
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       result.print()
     }
   }
@@ -133,63 +158,4 @@ object LinearRegression {
     }
   }
 
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  private var fileOutput: Boolean = false
-  private var dataPath: String = null
-  private var outputPath: String = null
-  private var numIterations: Int = 10
-
-  private def parseParameters(programArguments: Array[String]): Boolean = {
-    if (programArguments.length > 0) {
-      fileOutput = true
-      if (programArguments.length == 3) {
-        dataPath = programArguments(0)
-        outputPath = programArguments(1)
-        numIterations = programArguments(2).toInt
-
-        true
-      }
-      else {
-        System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>")
-
-        false
-      }
-    }
-    else {
-      System.out.println("Executing Linear Regression example with default parameters and " +
-        "built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  We provide a data generator to create synthetic input files for this " +
-        "program.")
-      System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>")
-
-      true
-    }
-  }
-
-  private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = {
-    if (fileOutput) {
-      env.readCsvFile[(Double, Double)](
-        dataPath,
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-        .map { t => new Data(t._1, t._2) }
-    }
-    else {
-      val data = LinearRegressionData.DATA map {
-        case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
-      }
-      env.fromCollection(data)
-    }
-  }
-
-  private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = {
-    val params = LinearRegressionData.PARAMS map {
-      case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
-    }
-    env.fromCollection(params)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
index 9d4d2ee..4962f2c 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.relational
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 
 import org.apache.flink.api.java.aggregation.Aggregations
@@ -68,7 +69,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
  *
  * Usage: 
  * {{{
- *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path>
+ *TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>
  * }}}
  *  
  * This example shows how to use:
@@ -80,21 +81,32 @@ import org.apache.flink.api.java.aggregation.Aggregations
 object TPCHQuery10 {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
+
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    if (!params.has("lineitem") && !params.has("customer") &&
+      !params.has("orders") && !params.has("nation")) {
+      println("  This program expects data from the TPC-H benchmark as input data.")
+      println("  Due to legal restrictions, we can not ship generated data.")
+      println("  You can find the TPC-H data generator at http://www.tpc.org/tpch/.")
+      println("  Usage: TPCHQuery10" +
+        "--customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>")
       return
     }
 
-    // get execution environment
+    // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
     // get customer data set: (custkey, name, address, nationkey, acctbal) 
-    val customers = getCustomerDataSet(env)
+    val customers = getCustomerDataSet(env, params.get("customer"))
     // get orders data set: (orderkey, custkey, orderdate)
-    val orders = getOrdersDataSet(env)
+    val orders = getOrdersDataSet(env, params.get("orders"))
     // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-    val lineitems = getLineitemDataSet(env)
+    val lineitems = getLineitemDataSet(env, params.get("lineitem"))
     // get nation data set: (nationkey, name)    
-    val nations = getNationDataSet(env)
+    val nations = getNationDataSet(env, params.get("nation"))
 
     // filter orders by years
     val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
@@ -115,67 +127,50 @@ object TPCHQuery10 {
                             .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
                           .join(revenueByCustomer).where(0).equalTo(0)
                             .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) )
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
 
-    // execute program
-    env.execute("Scala TPCH Query 10 Example")
+    if (params.has("output")) {
+      // emit result
+      result.writeAsCsv(params.get("output"), "\n", "|")
+      // execute program
+      env.execute("Scala TPCH Query 10 Example")
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      result.print()
+    }
+
   }
   
-  
   // *************************************************************************
   //     UTIL METHODS
   // *************************************************************************
   
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var lineitemPath: String = null
-  private var nationPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 5) {
-      customerPath = args(0)
-      ordersPath = args(1)
-      lineitemPath = args(2)
-      nationPath = args(3)
-      outputPath = args(4)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-          "  Due to legal restrictions, we can not ship generated data.\n" +
-          "  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-          "  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + 
-                                "<lineitem-csv path> <nation-csv path> <result path>")
-      false
-    }
-  }
-  
-  private def getCustomerDataSet(env: ExecutionEnvironment): 
-                         DataSet[Tuple5[Int, String, String, Int, Double]] = {
-    env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
+  private def getCustomerDataSet(env: ExecutionEnvironment, customerPath: String):
+                         DataSet[(Int, String, String, Int, Double)] = {
+    env.readCsvFile[(Int, String, String, Int, Double)](
         customerPath,
         fieldDelimiter = "|",
         includedFields = Array(0,1,2,3,5) )
   }
   
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = {
-    env.readCsvFile[Tuple3[Int, Int, String]](
+  private def getOrdersDataSet(env: ExecutionEnvironment, ordersPath: String):
+                       DataSet[(Int, Int, String)] = {
+    env.readCsvFile[(Int, Int, String)](
         ordersPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 1, 4) )
   }
   
-  private def getLineitemDataSet(env: ExecutionEnvironment):
-                         DataSet[Tuple4[Int, Double, Double, String]] = {
-    env.readCsvFile[Tuple4[Int, Double, Double, String]](
+  private def getLineitemDataSet(env: ExecutionEnvironment, lineitemPath: String):
+                         DataSet[(Int, Double, Double, String)] = {
+    env.readCsvFile[(Int, Double, Double, String)](
         lineitemPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 5, 6, 8) )
   }
 
-  private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = {
-    env.readCsvFile[Tuple2[Int, String]](
+  private def getNationDataSet(env: ExecutionEnvironment, nationPath: String):
+                       DataSet[(Int, String)] = {
+    env.readCsvFile[(Int, String)](
         nationPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 1) )

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
index f6da4bf..d157a75 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.relational
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 
 import org.apache.flink.api.java.aggregation.Aggregations
@@ -61,7 +62,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
  *
  * Usage: 
  * {{{
- * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>
  * }}}
  *  
  * This example shows how to use:
@@ -72,23 +73,37 @@ import org.apache.flink.api.java.aggregation.Aggregations
 object TPCHQuery3 {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
+
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    if (!params.has("lineitem") && !params.has("customer") && !params.has("orders")) {
+      println("  This program expects data from the TPC-H benchmark as input data.")
+      println("  Due to legal restrictions, we can not ship generated data.")
+      println("  You can find the TPC-H data generator at http://www.tpc.org/tpch/.")
+      println("  Usage: TPCHQuery3 " +
+        "--lineitem <path> --customer <path> --orders <path> [--output <path>]")
       return
     }
 
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
     // set filter date
     val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
     val date = dateFormat.parse("1995-03-12")
     
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
     // read and filter lineitems by shipDate
-    val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) )
+    val lineitems =
+      getLineitemDataSet(env, params.get("lineitem")).
+        filter( l => dateFormat.parse(l.shipDate).after(date) )
     // read and filter customers by market segment
-    val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE"))
+    val customers =
+      getCustomerDataSet(env, params.get("customer")).
+        filter( c => c.mktSegment.equals("AUTOMOBILE"))
     // read orders
-    val orders = getOrdersDataSet(env)
+    val orders = getOrdersDataSet(env, params.get("order"))
 
                       // filter orders by order date
     val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) )
@@ -106,11 +121,16 @@ object TPCHQuery3 {
     val result = items.groupBy("orderId", "orderDate", "shipPrio")
                       .aggregate(Aggregations.SUM, "revenue")
 
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-    
-    // execute program
-    env.execute("Scala TPCH Query 3 Example")
+    if (params.has("output")) {
+      // emit result
+      result.writeAsCsv(params.get("output"), "\n", "|")
+      // execute program
+      env.execute("Scala TPCH Query 3 Example")
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      result.print()
+    }
+
   }
   
   // *************************************************************************
@@ -126,47 +146,28 @@ object TPCHQuery3 {
   //     UTIL METHODS
   // *************************************************************************
   
-  private var lineitemPath: String = null
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>")
-      false
-    }
-  }
-  
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+  private def getLineitemDataSet(env: ExecutionEnvironment, lineitemPath: String):
+                         DataSet[Lineitem] = {
     env.readCsvFile[Lineitem](
         lineitemPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 5, 6, 10) )
   }
 
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+  private def getCustomerDataSet(env: ExecutionEnvironment, customerPath: String):
+                         DataSet[Customer] = {
     env.readCsvFile[Customer](
         customerPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 6) )
   }
-  
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+
+  private def getOrdersDataSet(env: ExecutionEnvironment, ordersPath: String):
+                       DataSet[Order] = {
     env.readCsvFile[Order](
         ordersPath,
         fieldDelimiter = "|",
         includedFields = Array(0, 1, 4, 7) )
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 5c2587f..09e2d52 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.relational
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.relational.util.WebLogData
 import org.apache.flink.util.Collector
@@ -74,7 +75,7 @@ import org.apache.flink.util.Collector
  *
  * Usage
  * {{{
- *   WebLogAnalysis <documents path> <ranks path> <visits path> <result path>
+ *   WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
  * }}}
  *
  * If no parameters are provided, the program is run with default data from
@@ -90,15 +91,19 @@ import org.apache.flink.util.Collector
 object WebLogAnalysis {
 
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
 
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    println("Usage: --documents <path> --ranks <path> --visits <path> --output <path>")
+
+    // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    val documents = getDocumentsDataSet(env)
-    val ranks = getRanksDataSet(env)
-    val visits = getVisitsDataSet(env)
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    val documents = getDocumentsDataSet(env, params)
+    val ranks = getRanksDataSet(env, params)
+    val visits = getVisitsDataSet(env, params)
 
     val filteredDocs = documents
       .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
@@ -118,60 +123,27 @@ object WebLogAnalysis {
         if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
     }.withForwardedFieldsFirst("*")
 
-
-
-
     // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", "|")
+    if (params.has("output")) {
+      result.writeAsCsv(params.get("output"), "\n", "|")
       env.execute("Scala WebLogAnalysis Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       result.print()
     }
 
   }
 
-  private var fileOutput: Boolean = false
-  private var documentsPath: String = null
-  private var ranksPath: String = null
-  private var visitsPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 4) {
-        documentsPath = args(0)
-        ranksPath = args(1)
-        visitsPath = args(2)
-        outputPath = args(3)
-      }
-      else {
-        System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
-          "<result path>")
-        return false
-      }
-    }
-    else {
-      System.out.println("Executing WebLog Analysis example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  We provide a data generator to create synthetic input files for this " +
-        "program.")
-      System.out.println("  Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
-        "<result path>")
-    }
-    true
-  }
-
-  private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
-    if (fileOutput) {
+  private def getDocumentsDataSet(env: ExecutionEnvironment, params: ParameterTool):
+  DataSet[(String, String)] = {
+    if (params.has("documents")) {
       env.readCsvFile[(String, String)](
-        documentsPath,
+        params.get("documents"),
         fieldDelimiter = "|",
         includedFields = Array(0, 1))
-    }
-    else {
+    } else {
+      println("Executing WebLogAnalysis example with default documents data set.")
+      println("Use --documents to specify file input.")
       val documents = WebLogData.DOCUMENTS map {
         case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
       }
@@ -179,14 +151,16 @@ object WebLogAnalysis {
     }
   }
 
-  private def getRanksDataSet(env: ExecutionEnvironment): DataSet[(Int, String, Int)] = {
-    if (fileOutput) {
+  private def getRanksDataSet(env: ExecutionEnvironment, params: ParameterTool):
+  DataSet[(Int, String, Int)] = {
+    if (params.has("ranks")) {
       env.readCsvFile[(Int, String, Int)](
-        ranksPath,
+        params.get("ranks"),
         fieldDelimiter = "|",
         includedFields = Array(0, 1, 2))
-    }
-    else {
+    } else {
+      println("Executing WebLogAnalysis example with default ranks data set.")
+      println("Use --ranks to specify file input.")
       val ranks = WebLogData.RANKS map {
         case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
       }
@@ -194,14 +168,16 @@ object WebLogAnalysis {
     }
   }
 
-  private def getVisitsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
-    if (fileOutput) {
+  private def getVisitsDataSet(env: ExecutionEnvironment, params: ParameterTool):
+  DataSet[(String, String)] = {
+    if (params.has("visits")) {
       env.readCsvFile[(String, String)](
-        visitsPath,
+        params.get("visits"),
         fieldDelimiter = "|",
         includedFields = Array(1, 2))
-    }
-    else {
+    } else {
+      println("Executing WebLogAnalysis example with default visits data set.")
+      println("Use --visits to specify file input.")
       val visits = WebLogData.VISITS map {
         case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
index 68092b3..7ce9d51 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.examples.scala.wordcount
 
+import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.wordcount.util.WordCountData
 
@@ -29,7 +30,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData
  *
  * Usage:
  * {{{
- *   WordCount <text path> <result path>>
+ *   WordCount --input <path> --output <path>
  * }}}
  *
  * If no parameters are provided, the program is run with default data from
@@ -43,59 +44,41 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData
  *
  */
 object WordCount {
+
   def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
 
+    val params: ParameterTool = ParameterTool.fromArgs(args)
+    System.out.println("Usage: WordCount --input <path> --output <path>")
+
+    // set up execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val text = getTextDataSet(env)
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+    val text =
+      if (params.has("input")) {
+        env.readTextFile(params.get("input"))
+      } else {
+        println("Executing WordCount example with default input data set.")
+        println("Use --input to specify file input.")
+        env.fromCollection(WordCountData.WORDS)
+      }
 
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
       .groupBy(0)
       .sum(1)
 
-    if (fileOutput) {
-      counts.writeAsCsv(outputPath, "\n", " ")
+    if (params.has("output")) {
+      counts.writeAsCsv(params.get("output"), "\n", " ")
       env.execute("Scala WordCount Example")
     } else {
+      println("Printing result to stdout. Use --output to specify output path.")
       counts.print()
     }
 
   }
 
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 2) {
-        textPath = args(0)
-        outputPath = args(1)
-        true
-      } else {
-        System.err.println("Usage: WordCount <text path> <result path>")
-        false
-      }
-    } else {
-      System.out.println("Executing WordCount example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: WordCount <text path> <result path>")
-      true
-    }
-  }
-
-  private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
-    if (fileOutput) {
-      env.readTextFile(textPath)
-    }
-    else {
-      env.fromCollection(WordCountData.WORDS)
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var textPath: String = null
-  private var outputPath: String = null
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 31fa3fd..67718cf 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.examples.iteration;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -37,8 +38,13 @@ import java.util.Random;
  * it performs to reach a specific threshold in an iterative streaming fashion. </p>
  *
  * <p>
- * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
- * outputs. </ul>
+ * This example shows how to use:
+ * <ul>
+ *   <li>streaming iterations,
+ *   <li>buffer timeout to enhance latency,
+ *   <li>directed outputs.
+ * </ul>
+ * </p>
  */
 public class IterateExample {
 
@@ -50,9 +56,9 @@ public class IterateExample {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("  Usage: IterateExample --input <path> --output <path>");
 
 		// set up input for the stream of integer pairs
 
@@ -60,12 +66,17 @@ public class IterateExample {
 		// continuous flushing of the output buffers (lowest latency)
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(1);
+		
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
 
 		// create input stream of integer pairs
 		DataStream<Tuple2<Integer, Integer>> inputStream;
-		if (fileInput) {
-			inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
+		if (params.has("input")) {
+			inputStream = env.readTextFile(params.get("input")).map(new FibonacciInputMap());
 		} else {
+			System.out.println("Executing Iterate example with default input data set.");
+			System.out.println("Use --input to specify file input.");
 			inputStream = env.addSource(new RandomFibonacciSource());
 		}
 
@@ -89,9 +100,10 @@ public class IterateExample {
 				.map(new OutputMap());
 
 		// emit results
-		if (fileOutput) {
-			numbers.writeAsText(outputPath);
+		if (params.has("output")) {
+			numbers.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			numbers.print();
 		}
 
@@ -210,37 +222,4 @@ public class IterateExample {
 		}
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInput = false;
-	private static boolean fileOutput = false;
-	private static String inputPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 1) {
-				fileOutput = true;
-				outputPath = args[0];
-			} else if (args.length == 2) {
-				fileInput = true;
-				inputPath = args[0];
-				fileOutput = true;
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: IterateExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing IterateExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: IterateExample <result path>");
-		}
-		return true;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 7c64482..216774b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.examples.join;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -60,18 +60,19 @@ public class WindowJoin {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>");
 
 		// obtain execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(params);
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		// connect to the data sources for grades and salaries
-		Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
-		DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
-		DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
+		DataStream<Tuple3<Long, String, Integer>> grades = getGradesPath(env, params);
+		DataStream<Tuple3<Long, String, Integer>> salaries = getSalariesPath(env, params);
 
 		// extract the timestamps
 		grades = grades.assignTimestampsAndWatermarks(new MyTimestampExtractor());
@@ -87,9 +88,10 @@ public class WindowJoin {
 				.apply(new MyJoinFunction());
 
 		// emit result
-		if (fileOutput) {
-			joinedStream.writeAsText(outputPath);
+		if (params.has("output")) {
+			joinedStream.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			joinedStream.print();
 		}
 
@@ -233,54 +235,24 @@ public class WindowJoin {
 	// UTIL METHODS
 	// *************************************************************************
 
-	private static boolean fileInput = false;
-	private static boolean fileOutput = false;
-
-	private static String gradesPath;
-	private static String salariesPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 1) {
-				fileOutput = true;
-				outputPath = args[0];
-			} else if (args.length == 3) {
-				fileInput = true;
-				fileOutput = true;
-				gradesPath = args[0];
-				salariesPath = args[1];
-				outputPath = args[2];
-			} else {
-				System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " +
-						"<result path>");
-				return false;
-			}
+	private static DataStream<Tuple3<Long, String, Integer>> getGradesPath(StreamExecutionEnvironment env, ParameterTool params) {
+		if (params.has("grades")) {
+			return env.readTextFile(params.get("grades")).map(new MySourceMap());
 		} else {
-			System.out.println("Executing WindowJoin with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: WindowJoin <result path>");
+			System.out.println("Executing WindowJoin example with default grades data set.");
+			System.out.println("Use --grades to specify file input.");
+			return env.addSource(new GradeSource());
 		}
-		return true;
 	}
 
-	private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
-			StreamExecutionEnvironment env) {
-
-		DataStream<Tuple3<Long, String, Integer>> grades;
-		DataStream<Tuple3<Long, String, Integer>> salaries;
-
-		if (fileInput) {
-			grades = env.readTextFile(gradesPath).map(new MySourceMap());
-			salaries = env.readTextFile(salariesPath).map(new MySourceMap());
+	private static DataStream<Tuple3<Long, String, Integer>> getSalariesPath(StreamExecutionEnvironment env, ParameterTool params) {
+		if (params.has("salaries")) {
+			return env.readTextFile(params.get("salaries")).map(new MySourceMap());
 		} else {
-			grades = env.addSource(new GradeSource());
-			salaries = env.addSource(new SalarySource());
+			System.out.println("Executing WindowJoin example with default salaries data set.");
+			System.out.println("Use --salaries to specify file input.");
+			return env.addSource(new SalarySource());
 		}
-
-		return Tuple2.of(grades, salaries);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0629e256/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 405a586..4c1c40f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.ml;
 
+import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -57,9 +58,9 @@ public class IncrementalLearningSkeleton {
 
 	public static void main(String[] args) throws Exception {
 
-		if (!parseParameters(args)) {
-			return;
-		}
+		// Checking input parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		System.out.println("Usage: IncrementalLearningSkeleton --output <path>");
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -77,9 +78,10 @@ public class IncrementalLearningSkeleton {
 		DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
 
 		// emit result
-		if (fileOutput) {
-			prediction.writeAsText(outputPath);
+		if (params.has("output")) {
+			prediction.writeAsText(params.get("output"));
 		} else {
+			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			prediction.print();
 		}
 
@@ -217,30 +219,4 @@ public class IncrementalLearningSkeleton {
 
 	}
 
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
-		}
-		return true;
-	}
-
 }


Mime
View raw message