flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [01/50] flink git commit: [FLINK-3195] Rework quickstart example & some cleanups on the new examples
Date Thu, 14 Jan 2016 16:15:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 62938c110 -> e9bf13d86


[FLINK-3195] Rework quickstart example & some cleanups on the new examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7520a53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7520a53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7520a53

Branch: refs/heads/master
Commit: b7520a53f83d23cc6dfd9ff958880837cb0c99ae
Parents: d0e1d63
Author: Robert Metzger <rmetzger@apache.org>
Authored: Wed Jan 6 17:37:26 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Jan 14 11:18:58 2016 +0100

----------------------------------------------------------------------
 docs/page/img/jobmanager.png                    | Bin 97051 -> 0 bytes
 .../jobmanager_kmeans_execute.png               | Bin 0 -> 126912 bytes
 .../jobmanager_kmeans_submit.png                | Bin 0 -> 39526 bytes
 docs/page/img/webclient_job_view.png            | Bin 64972 -> 0 bytes
 docs/page/img/webclient_plan_view.png           | Bin 140756 -> 0 bytes
 docs/quickstart/run_example_quickstart.md       |  59 +--
 flink-dist/pom.xml                              |   6 -
 flink-examples/flink-examples-batch/pom.xml     |   6 +-
 .../examples/java/graph/EnumTriangles.java      | 231 ++++++++++++
 .../examples/java/graph/EnumTrianglesBasic.java | 231 ------------
 .../examples/java/graph/EnumTrianglesOpt.java   | 356 -------------------
 .../java/graph/util/EnumTrianglesData.java      |   3 +-
 .../java/ml/util/LinearRegressionData.java      |  15 +-
 .../examples/scala/graph/EnumTriangles.scala    | 185 ++++++++++
 .../scala/graph/EnumTrianglesBasic.scala        | 185 ----------
 .../examples/scala/graph/EnumTrianglesOpt.scala | 253 -------------
 .../EnumTriangleBasicITCase.java                |   4 +-
 .../EnumTriangleOptITCase.java                  |  46 ---
 .../EnumTriangleBasicITCase.java                |  46 ---
 .../EnumTriangleITCase.java                     |  46 +++
 .../EnumTriangleOptITCase.java                  |  46 ---
 21 files changed, 496 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/page/img/jobmanager.png
----------------------------------------------------------------------
diff --git a/docs/page/img/jobmanager.png b/docs/page/img/jobmanager.png
deleted file mode 100644
index eaa5838..0000000
Binary files a/docs/page/img/jobmanager.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/page/img/quickstart-example/jobmanager_kmeans_execute.png
----------------------------------------------------------------------
diff --git a/docs/page/img/quickstart-example/jobmanager_kmeans_execute.png b/docs/page/img/quickstart-example/jobmanager_kmeans_execute.png
new file mode 100644
index 0000000..00d323f
Binary files /dev/null and b/docs/page/img/quickstart-example/jobmanager_kmeans_execute.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/page/img/quickstart-example/jobmanager_kmeans_submit.png
----------------------------------------------------------------------
diff --git a/docs/page/img/quickstart-example/jobmanager_kmeans_submit.png b/docs/page/img/quickstart-example/jobmanager_kmeans_submit.png
new file mode 100644
index 0000000..974e03e
Binary files /dev/null and b/docs/page/img/quickstart-example/jobmanager_kmeans_submit.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/page/img/webclient_job_view.png
----------------------------------------------------------------------
diff --git a/docs/page/img/webclient_job_view.png b/docs/page/img/webclient_job_view.png
deleted file mode 100644
index dbd64c7..0000000
Binary files a/docs/page/img/webclient_job_view.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/page/img/webclient_plan_view.png
----------------------------------------------------------------------
diff --git a/docs/page/img/webclient_plan_view.png b/docs/page/img/webclient_plan_view.png
deleted file mode 100644
index 201ef66..0000000
Binary files a/docs/page/img/webclient_plan_view.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/docs/quickstart/run_example_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md
index 4493812..2d399a8 100644
--- a/docs/quickstart/run_example_quickstart.md
+++ b/docs/quickstart/run_example_quickstart.md
@@ -23,7 +23,8 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
+This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
+On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
 
 ## Setup Flink
 Follow the [instructions](setup_quickstart.html) to setup Flink and enter the root directory of your Flink setup.
@@ -36,7 +37,7 @@ Flink contains a data generator for K-Means.
 mkdir kmeans
 cd kmeans
 # Run data generator
-java -cp ../examples/KMeans.jar:../lib/flink-dist-{{ site.version }}.jar \
+java -cp ../examples/batch/KMeans.jar:../lib/flink-dist-{{ site.version }}.jar \
   org.apache.flink.examples.java.clustering.util.KMeansDataGenerator \
   -points 500 -k 10 -stddev 0.08 -output `pwd`
 ~~~
@@ -78,70 +79,48 @@ Start Flink and the web job submission client on your local machine.
 cd ..
 # start Flink
 ./bin/start-local.sh
-# Start the web client
-./bin/start-webclient.sh
 ~~~
 
 ## Inspect and Run the K-Means Example Program
-The Flink web client allows to submit Flink programs using a graphical user interface.
+The Flink web interface allows to submit Flink programs using a graphical user interface.
 
 <div class="row" style="padding-top:15px">
 	<div class="col-md-6">
-		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/webclient_job_view.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/webclient_job_view.png" /></a>
+		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" /></a>
 	</div>
 	<div class="col-md-6">
-		1. Open web client on  <a href="http://localhost:8080/launch.html">localhost:8080</a> <br>
-		2. Upload the K-Mean job JAR file. 
+		1. Open web interface on <a href="http://localhost:8081">localhost:8081</a> <br>
+		2. Select the "Submit new Job" page in the menu <br>
+		3. Upload the <code>KMeans.jar</code> from <code>examples/batch</code> by clicking the "Add New" button, and then the "Upload" button. <br>
+		4. Select the <code>KMeans.jar</code> form the list of jobs <br>
+		5. Enter the arguments and options in the lower box: <br>
+		    Leave the <i>Entry Class</i> and <i>Parallelism</i> form empty<br>
+		    Enter the following program arguments: <br>
+		    (KMeans expects the following args: <code>&lt;points path&gt; &lt;centers path&gt; &lt;result path&gt; &lt;num iterations&gt;</code>
 			{% highlight bash %}
-			./examples/KMeans.jar
-			{% endhighlight %} </br>
-		3. Select it in the left box to see how the operators in the plan are connected to each other. <br>
-		4. Enter the arguments and options in the lower left box: <br>
-            Arguments: <br>
-			{% highlight bash %}
-			file://<pathToFlink>/kmeans/points file://<pathToFlink>/kmeans/centers file://<pathToFlink>/kmeans/result 10
-			{% endhighlight %}
-			For example:
-			{% highlight bash %}
-			file:///tmp/flink/kmeans/points file:///tmp/flink/kmeans/centers file:///tmp/flink/kmeans/result 10
-			{% endhighlight %}
-            Options (optional): (set the default parallelims, e.g., to 4) <br>
-			{% highlight bash %}
-            -p 4
-			{% endhighlight %}
+            /tmp/kmeans/points /tmp/kmeans/centers /tmp/kmeans/result 10
+			{% endhighlight %} <br>
+		6. Press <b>Submit</b> to start the job
 	</div>
 </div>
 <hr>
 <div class="row" style="padding-top:15px">
 	<div class="col-md-6">
-		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/webclient_plan_view.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/webclient_plan_view.png" /></a>
+		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" /></a>
 	</div>
 
 	<div class="col-md-6">
-		1. Press the <b>RunJob</b> to see the optimizer plan. <br>
-		2. Inspect the operators and see the properties (input sizes, cost estimation) determined by the optimizer.
-	</div>
-</div>
-<hr>
-<div class="row" style="padding-top:15px">
-	<div class="col-md-6">
-		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/jobmanager.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/jobmanager.png" /></a>
-	</div>
-	<div class="col-md-6">
-		1. Press the <b>Continue</b> button to start executing the job. <br>
-		2. <a href="http://localhost:8080/launch.html">Open Flink's monitoring interface</a> to see the job's progress. (Due to the small input data, the job will finish really quick!)<br>
-		3. Once the job has finished, you can analyze the runtime of the individual operators.
+		Watch the job executing.
 	</div>
 </div>
 
+
 ## Shutdown Flink
 Stop Flink when you are done.
 
 ~~~ bash
 # stop Flink
 ./bin/stop-local.sh
-# Stop the Flink web client
-./bin/stop-webclient.sh
 ~~~
 
 ## Analyze the Result

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 543652f..a4d8d00 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -85,12 +85,6 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-examples-batch</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/pom.xml b/flink-examples/flink-examples-batch/pom.xml
index a989ef5..e31f35c 100644
--- a/flink-examples/flink-examples-batch/pom.xml
+++ b/flink-examples/flink-examples-batch/pom.xml
@@ -256,13 +256,13 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
+									<program-class>org.apache.flink.examples.java.graph.EnumTriangles</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>**/java/graph/EnumTrianglesBasic.class</include>
-								<include>**/java/graph/EnumTrianglesBasic$*.class</include>
+								<include>**/java/graph/EnumTriangles.class</include>
+								<include>**/java/graph/EnumTriangles$*.class</include>
 								<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
 								<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
 								<include>**/java/graph/util/EnumTrianglesData.class</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
new file mode 100644
index 0000000..b789008
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * <p>
+ * The algorithm works as follows: 
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *  
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: <code>EnumTriangleBasic &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class EnumTriangles {
+
+	static boolean fileOutput = false;
+	static String edgePath = null;
+	static String outputPath = null;
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	
+		// read input data
+		DataSet<Edge> edges = getEdgeDataSet(env);
+		
+		// project edges by vertex id
+		DataSet<Edge> edgesById = edges
+				.map(new EdgeByIdProjector());
+		
+		DataSet<Triad> triangles = edgesById
+				// build triads
+				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
+				// filter triads
+				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
+
+		// emit result
+		if (fileOutput) {
+			triangles.writeAsCsv(outputPath, "\n", ",");
+			// execute program
+			env.execute("Basic Triangle Enumeration Example");
+		} else {
+			triangles.print();
+		}
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/** Converts a Tuple2 into an Edge */
+	@ForwardedFields("0;1")
+	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+		private final Edge outEdge = new Edge();
+		
+		@Override
+		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+			outEdge.copyVerticesFromTuple2(t);
+			return outEdge;
+		}
+	}
+	
+	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
+	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
+	
+		@Override
+		public Edge map(Edge inEdge) throws Exception {
+			
+			// flip vertices if necessary
+			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+				inEdge.flipVertices();
+			}
+			
+			return inEdge;
+		}
+	}
+	
+	/**
+	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	@ForwardedFields("0")
+	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+		private final List<Integer> vertices = new ArrayList<Integer>();
+		private final Triad outTriad = new Triad();
+		
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+			
+			final Iterator<Edge> edges = edgesIter.iterator();
+			
+			// clear vertex list
+			vertices.clear();
+			
+			// read first edge
+			Edge firstEdge = edges.next();
+			outTriad.setFirstVertex(firstEdge.getFirstVertex());
+			vertices.add(firstEdge.getSecondVertex());
+			
+			// build and emit triads
+			while (edges.hasNext()) {
+				Integer higherVertexId = edges.next().getSecondVertex();
+				
+				// combine vertex with all previously read vertices
+				for (Integer lowerVertexId : vertices) {
+					outTriad.setSecondVertex(lowerVertexId);
+					outTriad.setThirdVertex(higherVertexId);
+					out.collect(outTriad);
+				}
+				vertices.add(higherVertexId);
+			}
+		}
+	}
+	
+	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
+	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
+		
+		@Override
+		public Triad join(Triad triad, Edge edge) throws Exception {
+			return triad;
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean parseParameters(String[] args) {
+	
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				edgePath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgePath)
+						.fieldDelimiter(" ")
+						.includeFields(true, true)
+						.types(Integer.class, Integer.class)
+						.map(new TupleEdgeConverter());
+		} else {
+			return EnumTrianglesData.getDefaultEdgeDataSet(env);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
deleted file mode 100644
index fd5c5d3..0000000
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.graph;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
- * A triangle consists of three edges that connect three vertices with each other.
- * 
- * <p>
- * The algorithm works as follows: 
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
- * that closes the triangle.
- *  
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
- * that include a triangle
- * </ul>
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- * 
- * Usage: <code>EnumTriangleBasic &lt;edge path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. 
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Custom Java objects which extend Tuple
- * <li>Group Sorting
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class EnumTrianglesBasic {
-
-	static boolean fileOutput = false;
-	static String edgePath = null;
-	static String outputPath = null;
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		// set up execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-	
-		// read input data
-		DataSet<Edge> edges = getEdgeDataSet(env);
-		
-		// project edges by vertex id
-		DataSet<Edge> edgesById = edges
-				.map(new EdgeByIdProjector());
-		
-		DataSet<Triad> triangles = edgesById
-				// build triads
-				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
-				// filter triads
-				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
-
-		// emit result
-		if (fileOutput) {
-			triangles.writeAsCsv(outputPath, "\n", ",");
-			// execute program
-			env.execute("Basic Triangle Enumeration Example");
-		} else {
-			triangles.print();
-		}
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/** Converts a Tuple2 into an Edge */
-	@ForwardedFields("0;1")
-	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
-		private final Edge outEdge = new Edge();
-		
-		@Override
-		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
-			outEdge.copyVerticesFromTuple2(t);
-			return outEdge;
-		}
-	}
-	
-	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
-	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
-	
-		@Override
-		public Edge map(Edge inEdge) throws Exception {
-			
-			// flip vertices if necessary
-			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
-				inEdge.flipVertices();
-			}
-			
-			return inEdge;
-		}
-	}
-	
-	/**
-	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
-	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
-	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
-	 */
-	@ForwardedFields("0")
-	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
-		private final List<Integer> vertices = new ArrayList<Integer>();
-		private final Triad outTriad = new Triad();
-		
-		@Override
-		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
-			
-			final Iterator<Edge> edges = edgesIter.iterator();
-			
-			// clear vertex list
-			vertices.clear();
-			
-			// read first edge
-			Edge firstEdge = edges.next();
-			outTriad.setFirstVertex(firstEdge.getFirstVertex());
-			vertices.add(firstEdge.getSecondVertex());
-			
-			// build and emit triads
-			while (edges.hasNext()) {
-				Integer higherVertexId = edges.next().getSecondVertex();
-				
-				// combine vertex with all previously read vertices
-				for (Integer lowerVertexId : vertices) {
-					outTriad.setSecondVertex(lowerVertexId);
-					outTriad.setThirdVertex(higherVertexId);
-					out.collect(outTriad);
-				}
-				vertices.add(higherVertexId);
-			}
-		}
-	}
-	
-	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
-	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
-		
-		@Override
-		public Triad join(Triad triad, Edge edge) throws Exception {
-			return triad;
-		}
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean parseParameters(String[] args) {
-	
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				edgePath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(edgePath)
-						.fieldDelimiter(" ")
-						.includeFields(true, true)
-						.types(Integer.class, Integer.class)
-						.map(new TupleEdgeConverter());
-		} else {
-			return EnumTrianglesData.getDefaultEdgeDataSet(env);
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
deleted file mode 100644
index 40604e3..0000000
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.graph;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
- * A triangle consists of three edges that connect three vertices with each other.
- * 
- * <p>
- * The basic algorithm works as follows: 
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
- * that closes the triangle.
- * 
- * <p>
- * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
- * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to 
- * reduce the number of triads. 
- * This implementation extends the basic algorithm by computing output degrees of edge vertices and 
- * grouping on edges on the vertex with the smaller degree.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
- * that include a triangle
- * </ul>
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- * 
- * Usage: <code>EnumTriangleOpt &lt;edge path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.EnumTrianglesData}.
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Custom Java objects which extend Tuple
- * <li>Group Sorting
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class EnumTrianglesOpt {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		// set up execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// read input data
-		DataSet<EnumTrianglesDataTypes.Edge> edges = getEdgeDataSet(env);
-		
-		// annotate edges with degrees
-		DataSet<EnumTrianglesDataTypes.EdgeWithDegrees> edgesWithDegrees = edges
-				.flatMap(new EdgeDuplicator())
-				.groupBy(EnumTrianglesDataTypes.Edge.V1).sortGroup(EnumTrianglesDataTypes.Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
-				.groupBy(EnumTrianglesDataTypes.EdgeWithDegrees.V1, EnumTrianglesDataTypes.EdgeWithDegrees.V2).reduce(new DegreeJoiner());
-		
-		// project edges by degrees
-		DataSet<EnumTrianglesDataTypes.Edge> edgesByDegree = edgesWithDegrees
-				.map(new EdgeByDegreeProjector());
-		// project edges by vertex id
-		DataSet<EnumTrianglesDataTypes.Edge> edgesById = edgesByDegree
-				.map(new EdgeByIdProjector());
-		
-		DataSet<EnumTrianglesDataTypes.Triad> triangles = edgesByDegree
-				// build triads
-				.groupBy(EnumTrianglesDataTypes.Edge.V1).sortGroup(EnumTrianglesDataTypes.Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
-				// filter triads
-				.join(edgesById).where(EnumTrianglesDataTypes.Triad.V2, EnumTrianglesDataTypes.Triad.V3).equalTo(EnumTrianglesDataTypes.Edge.V1, EnumTrianglesDataTypes.Edge.V2).with(new TriadFilter());
-
-		// emit result
-		if(fileOutput) {
-			triangles.writeAsCsv(outputPath, "\n", ",");
-			// execute program
-			env.execute("Triangle Enumeration Example");
-		} else {
-			triangles.print();
-		}
-
-		
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-	
-	/** Converts a Tuple2 into an Edge */
-	@ForwardedFields("0;1")
-	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, EnumTrianglesDataTypes.Edge> {
-		private final EnumTrianglesDataTypes.Edge outEdge = new EnumTrianglesDataTypes.Edge();
-		
-		@Override
-		public EnumTrianglesDataTypes.Edge map(Tuple2<Integer, Integer> t) throws Exception {
-			outEdge.copyVerticesFromTuple2(t);
-			return outEdge;
-		}
-	}
-	
-	/** Emits for an edge the original edge and its switched version. */
-	private static class EdgeDuplicator implements FlatMapFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Edge> {
-		
-		@Override
-		public void flatMap(EnumTrianglesDataTypes.Edge edge, Collector<EnumTrianglesDataTypes.Edge> out) throws Exception {
-			out.collect(edge);
-			edge.flipVertices();
-			out.collect(edge);
-		}
-	}
-	
-	/**
-	 * Counts the number of edges that share a common vertex.
-	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
-	 * For each emitted edge, the first vertex is the vertex with the smaller id.
-	 */
-	private static class DegreeCounter implements GroupReduceFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.EdgeWithDegrees> {
-		
-		final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
-		final EnumTrianglesDataTypes.EdgeWithDegrees outputEdge = new EnumTrianglesDataTypes.EdgeWithDegrees();
-		
-		@Override
-		public void reduce(Iterable<EnumTrianglesDataTypes.Edge> edgesIter, Collector<EnumTrianglesDataTypes.EdgeWithDegrees> out) {
-			
-			Iterator<EnumTrianglesDataTypes.Edge> edges = edgesIter.iterator();
-			otherVertices.clear();
-			
-			// get first edge
-			EnumTrianglesDataTypes.Edge edge = edges.next();
-			Integer groupVertex = edge.getFirstVertex();
-			this.otherVertices.add(edge.getSecondVertex());
-			
-			// get all other edges (assumes edges are sorted by second vertex)
-			while (edges.hasNext()) {
-				edge = edges.next();
-				Integer otherVertex = edge.getSecondVertex();
-				// collect unique vertices
-				if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
-					this.otherVertices.add(otherVertex);
-				}
-			}
-			int degree = this.otherVertices.size();
-			
-			// emit edges
-			for(Integer otherVertex : this.otherVertices) {
-				if(groupVertex < otherVertex) {
-					outputEdge.setFirstVertex(groupVertex);
-					outputEdge.setFirstDegree(degree);
-					outputEdge.setSecondVertex(otherVertex);
-					outputEdge.setSecondDegree(0);
-				} else {
-					outputEdge.setFirstVertex(otherVertex);
-					outputEdge.setFirstDegree(0);
-					outputEdge.setSecondVertex(groupVertex);
-					outputEdge.setSecondDegree(degree);
-				}
-				out.collect(outputEdge);
-			}
-		}
-	}
-	
-	/**
-	 * Builds an edge with degree annotation from two edges that have the same vertices and only one 
-	 * degree annotation.
-	 */
-	@ForwardedFields("0;1")
-	private static class DegreeJoiner implements ReduceFunction<EnumTrianglesDataTypes.EdgeWithDegrees> {
-		private final EnumTrianglesDataTypes.EdgeWithDegrees outEdge = new EnumTrianglesDataTypes.EdgeWithDegrees();
-		
-		@Override
-		public EnumTrianglesDataTypes.EdgeWithDegrees reduce(EnumTrianglesDataTypes.EdgeWithDegrees edge1, EnumTrianglesDataTypes.EdgeWithDegrees edge2) throws Exception {
-			
-			// copy first edge
-			outEdge.copyFrom(edge1);
-			
-			// set missing degree
-			if(edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
-				outEdge.setFirstDegree(edge2.getFirstDegree());
-			} else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
-				outEdge.setSecondDegree(edge2.getSecondDegree());
-			}
-			return outEdge;
-		}
-	}
-		
-	/** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */
-	private static class EdgeByDegreeProjector implements MapFunction<EnumTrianglesDataTypes.EdgeWithDegrees, EnumTrianglesDataTypes.Edge> {
-		
-		private final EnumTrianglesDataTypes.Edge outEdge = new EnumTrianglesDataTypes.Edge();
-		
-		@Override
-		public EnumTrianglesDataTypes.Edge map(EnumTrianglesDataTypes.EdgeWithDegrees inEdge) throws Exception {
-
-			// copy vertices to simple edge
-			outEdge.copyVerticesFromEdgeWithDegrees(inEdge);
-
-			// flip vertices if first degree is larger than second degree.
-			if(inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
-				outEdge.flipVertices();
-			}
-
-			// return edge
-			return outEdge;
-		}
-	}
-	
-	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
-	private static class EdgeByIdProjector implements MapFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Edge> {
-	
-		@Override
-		public EnumTrianglesDataTypes.Edge map(EnumTrianglesDataTypes.Edge inEdge) throws Exception {
-			
-			// flip vertices if necessary
-			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
-				inEdge.flipVertices();
-			}
-			
-			return inEdge;
-		}
-	}
-	
-	/**
-	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
-	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
-	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
-	 */
-	@ForwardedFields("0")
-	private static class TriadBuilder implements GroupReduceFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Triad> {
-		
-		private final List<Integer> vertices = new ArrayList<Integer>();
-		private final EnumTrianglesDataTypes.Triad outTriad = new EnumTrianglesDataTypes.Triad();
-		
-		@Override
-		public void reduce(Iterable<EnumTrianglesDataTypes.Edge> edgesIter, Collector<EnumTrianglesDataTypes.Triad> out) throws Exception {
-			final Iterator<EnumTrianglesDataTypes.Edge> edges = edgesIter.iterator();
-			
-			// clear vertex list
-			vertices.clear();
-			
-			// read first edge
-			EnumTrianglesDataTypes.Edge firstEdge = edges.next();
-			outTriad.setFirstVertex(firstEdge.getFirstVertex());
-			vertices.add(firstEdge.getSecondVertex());
-			
-			// build and emit triads
-			while (edges.hasNext()) {
-				Integer higherVertexId = edges.next().getSecondVertex();
-				
-				// combine vertex with all previously read vertices
-				for(Integer lowerVertexId : vertices) {
-					outTriad.setSecondVertex(lowerVertexId);
-					outTriad.setThirdVertex(higherVertexId);
-					out.collect(outTriad);
-				}
-				vertices.add(higherVertexId);
-			}
-		}
-	}
-	
-	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
-	private static class TriadFilter implements JoinFunction<EnumTrianglesDataTypes.Triad, EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Triad> {
-		
-		@Override
-		public EnumTrianglesDataTypes.Triad join(EnumTrianglesDataTypes.Triad triad, EnumTrianglesDataTypes.Edge edge) throws Exception {
-			return triad;
-		}
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String edgePath = null;
-	private static String outputPath = null;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				edgePath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing Enum Triangles Opt example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: EnumTriangleOpt <edge path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<EnumTrianglesDataTypes.Edge> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(edgePath)
-						.fieldDelimiter(" ")
-						.includeFields(true, true)
-						.types(Integer.class, Integer.class)
-						.map(new TupleEdgeConverter());
-		} else {
-			return EnumTrianglesData.getDefaultEdgeDataSet(env);
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
index 2453809..a54b3da 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
 
 /**
  * Provides the default data sets used for the Triangle Enumeration example programs.
@@ -50,7 +51,7 @@ public class EnumTrianglesData {
 		
 		List<EnumTrianglesDataTypes.Edge> edges = new ArrayList<EnumTrianglesDataTypes.Edge>();
 		for(Object[] e : EDGES) {
-			edges.add(new EnumTrianglesDataTypes.Edge((Integer)e[0], (Integer)e[1]));
+			edges.add(new Edge((Integer)e[0], (Integer)e[1]));
 		}
 		
 		return env.fromCollection(edges);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
index fee0eb3..73b4b55 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
@@ -20,7 +20,8 @@ package org.apache.flink.examples.java.ml.util;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.ml.LinearRegression;
+import org.apache.flink.examples.java.ml.LinearRegression.Params;
+import org.apache.flink.examples.java.ml.LinearRegression.Data;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -50,20 +51,20 @@ public class LinearRegressionData {
 			new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
 			new Object[] { -0.55, -1.08 } };
 
-	public static DataSet<LinearRegression.Params> getDefaultParamsDataSet(
+	public static DataSet<Params> getDefaultParamsDataSet(
 			ExecutionEnvironment env) {
-		List<LinearRegression.Params> paramsList = new LinkedList<LinearRegression.Params>();
+		List<Params> paramsList = new LinkedList<>();
 		for (Object[] params : PARAMS) {
-			paramsList.add(new LinearRegression.Params((Double) params[0], (Double) params[1]));
+			paramsList.add(new Params((Double) params[0], (Double) params[1]));
 		}
 		return env.fromCollection(paramsList);
 	}
 
-	public static DataSet<LinearRegression.Data> getDefaultDataDataSet(ExecutionEnvironment env) {
+	public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
 
-		List<LinearRegression.Data> dataList = new LinkedList<LinearRegression.Data>();
+		List<Data> dataList = new LinkedList<>();
 		for (Object[] data : DATA) {
-			dataList.add(new LinearRegression.Data((Double) data[0], (Double) data[1]));
+			dataList.add(new Data((Double) data[0], (Double) data[1]));
 		}
 		return env.fromCollection(dataList);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
new file mode 100644
index 0000000..cff53d8
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.graph
+
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.util.Collector
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData
+import org.apache.flink.api.common.operators.Order
+
+import scala.collection.mutable
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *  
+ * Input files are plain text files and must be formatted as follows:
+ *
+ *  - Edges are represented as pairs for vertex IDs which are separated by space
+ *   characters. Edges are separated by new-line characters.
+ *   For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12),
+ *   (1)-(12), and (42)-(63) that include a triangle
+ *
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: 
+ * {{{
+ * EnumTriangleBasic <edge path> <result path>
+ * }}}
+ * <br>
+ * If no parameters are provided, the program is run with default data from 
+ * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
+ * 
+ * This example shows how to use:
+ *
+ *  - Custom Java objects which extend Tuple
+ *  - Group Sorting
+ *
+ */
+object EnumTriangles {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val edges = getEdgeDataSet(env)
+    
+    // project edges by vertex id
+    val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
+    
+    val triangles = edgesById
+            // build triads
+            .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
+            // filter triads
+            .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }
+              .withForwardedFieldsFirst("*")
+    
+    // emit result
+    if (fileOutput) {
+      triangles.writeAsCsv(outputPath, "\n", ",")
+      // execute program
+      env.execute("TriangleEnumeration Example")
+    } else {
+      triangles.print()
+    }
+    
+
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Edge(v1: Int, v2: Int) extends Serializable
+  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
+  
+    
+  // *************************************************************************
+  //     USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   *  Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex
+   *  of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes
+   *  that input edges share the first vertex and are in ascending order of the second vertex.
+   */
+  @ForwardedFields(Array("v1->v1"))
+  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
+
+    val vertices = mutable.MutableList[Integer]()
+    
+    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
+      
+      // clear vertex list
+      vertices.clear()
+
+      // build and emit triads
+      for(e <- edges.asScala) {
+      
+        // combine vertex with all previously read vertices
+        for(v <- vertices) {
+          out.collect(Triad(e.v1, v, e.v2))
+        }
+        vertices += e.v2
+      }
+    }
+  }
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 2) {
+        edgePath = args(0)
+        outputPath = args(1)
+
+        true
+      } else {
+        System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
+
+        false
+      }
+    } else {
+      System.out.println("Executing Enum Triangles Basic example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
+
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
+    if (fileOutput) {
+      env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1))
+    } else {
+      val edges = EnumTrianglesData.EDGES.map {
+        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
+      }
+      env.fromCollection(edges)
+    }
+  }
+  
+  
+  private var fileOutput: Boolean = false
+  private var edgePath: String = null
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
deleted file mode 100644
index 170aa1d..0000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.scala._
-import scala.collection.JavaConverters._
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData
-import org.apache.flink.api.common.operators.Order
-
-import scala.collection.mutable
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
- * A triangle consists of three edges that connect three vertices with each other.
- * 
- * The algorithm works as follows:
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
- * that closes the triangle.
- *  
- * Input files are plain text files and must be formatted as follows:
- *
- *  - Edges are represented as pairs for vertex IDs which are separated by space
- *   characters. Edges are separated by new-line characters.
- *   For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12),
- *   (1)-(12), and (42)-(63) that include a triangle
- *
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- * 
- * Usage: 
- * {{{
- * EnumTriangleBasic <edge path> <result path>
- * }}}
- * <br>
- * If no parameters are provided, the program is run with default data from 
- * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
- * 
- * This example shows how to use:
- *
- *  - Custom Java objects which extend Tuple
- *  - Group Sorting
- *
- */
-object EnumTrianglesBasic {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // read input data
-    val edges = getEdgeDataSet(env)
-    
-    // project edges by vertex id
-    val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) )
-    
-    val triangles = edgesById
-            // build triads
-            .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
-            // filter triads
-            .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }
-              .withForwardedFieldsFirst("*")
-    
-    // emit result
-    if (fileOutput) {
-      triangles.writeAsCsv(outputPath, "\n", ",")
-      // execute program
-      env.execute("TriangleEnumeration Example")
-    } else {
-      triangles.print()
-    }
-    
-
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Edge(v1: Int, v2: Int) extends Serializable
-  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
-  
-    
-  // *************************************************************************
-  //     USER FUNCTIONS
-  // *************************************************************************
-
-  /**
-   *  Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex
-   *  of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes
-   *  that input edges share the first vertex and are in ascending order of the second vertex.
-   */
-  @ForwardedFields(Array("v1->v1"))
-  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
-
-    val vertices = mutable.MutableList[Integer]()
-    
-    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
-      
-      // clear vertex list
-      vertices.clear()
-
-      // build and emit triads
-      for(e <- edges.asScala) {
-      
-        // combine vertex with all previously read vertices
-        for(v <- vertices) {
-          out.collect(Triad(e.v1, v, e.v2))
-        }
-        vertices += e.v2
-      }
-    }
-  }
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 2) {
-        edgePath = args(0)
-        outputPath = args(1)
-
-        true
-      } else {
-        System.err.println("Usage: EnumTriangleBasic <edge path> <result path>")
-
-        false
-      }
-    } else {
-      System.out.println("Executing Enum Triangles Basic example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
-
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
-    if (fileOutput) {
-      env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1))
-    } else {
-      val edges = EnumTrianglesData.EDGES.map {
-        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
-      }
-      env.fromCollection(edges)
-    }
-  }
-  
-  
-  private var fileOutput: Boolean = false
-  private var edgePath: String = null
-  private var outputPath: String = null
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
deleted file mode 100644
index 060a5f9..0000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.scala._
-import scala.collection.JavaConverters._
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData
-import org.apache.flink.api.common.operators.Order
-
-import scala.collection.mutable
-
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
- * A triangle consists of three edges that connect three vertices with each other.
- *
- * The basic algorithm works as follows:
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
- * that closes the triangle.
- *
- * For a group of ''i'' edges that share a common vertex, the number of built triads is quadratic
- * ''(n*(n-1))/2)''. Therefore, an optimization of the algorithm is to group edges on the vertex
- * with the smaller output degree to reduce the number of triads.
- * This implementation extends the basic algorithm by computing output degrees of edge vertices and 
- * grouping on edges on the vertex with the smaller degree.
- *
- * Input files are plain text files and must be formatted as follows:
- *
- *  - Edges are represented as pairs for vertex IDs which are separated by space
- *    characters. Edges are separated by new-line characters.
- *    For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12),
- *    (1)-(12), and (42)-(63) that include a triangle
- *
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- *
- * Usage:
- * {{{
- *   EnumTriangleOpt <edge path> <result path>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]].
- *
- * This example shows how to use:
- *
- *  - Custom Java objects which extend Tuple
- *  - Group Sorting
- *
- */
-object EnumTrianglesOpt {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // read input data
-    val edges = getEdgeDataSet(env)
-
-    val edgesWithDegrees = edges
-      // duplicate and switch edges
-      .flatMap(e => Seq(e, Edge(e.v2, e.v1)))
-      // add degree of first vertex
-      .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new DegreeCounter())
-      // join degrees of vertices
-      .groupBy("v1", "v2").reduce {
-        (e1, e2) =>
-          if (e1.d2 == 0) {
-            new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2)
-          } else {
-            new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2)
-          }
-      }.withForwardedFields("v1;v2")
-
-    // project edges by degrees, vertex with smaller degree comes first
-    val edgesByDegree = edgesWithDegrees
-      .map(e => if (e.d1 <= e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1))
-    // project edges by Id, vertex with smaller Id comes first
-    val edgesById = edgesByDegree
-      .map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1))
-
-    val triangles = edgesByDegree
-      // build triads
-      .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
-      // filter triads
-      .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t}
-        .withForwardedFieldsFirst("*")
-
-    // emit result
-    if (fileOutput) {
-      triangles.writeAsCsv(outputPath, "\n", ",")
-      // execute program
-      env.execute("TriangleEnumeration Example")
-    } else {
-      triangles.print()
-    }
-
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Edge(v1: Int, v2: Int) extends Serializable
-
-  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
-
-  case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable
-
-
-  // *************************************************************************
-  //     USER FUNCTIONS
-  // *************************************************************************
-
-  /**
-   * Counts the number of edges that share a common vertex.
-   * Emits one edge for each input edge with a degree annotation for the shared vertex.
-   * For each emitted edge, the first vertex is the vertex with the smaller id.
-   */
-  class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] {
-
-    val vertices = mutable.MutableList[Integer]()
-    var groupVertex = 0
-
-    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = {
-
-      // empty vertex list
-      vertices.clear()
-
-      // collect all vertices
-      for (e <- edges.asScala) {
-        groupVertex = e.v1
-        if (!vertices.contains(e.v2) && e.v1 != e.v2) {
-          vertices += e.v2
-        }
-      }
-
-      // count vertices to obtain degree of groupVertex
-      val degree = vertices.length
-
-      // create and emit edges with degrees
-      for (v <- vertices) {
-        if (v < groupVertex) {
-          out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree))
-        } else {
-          out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0))
-        }
-      }
-    }
-  }
-
-  /**
-   * Builds triads (triples of vertices) from pairs of edges that share a vertex.
-   * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by
-   * vertexId.
-   * Assumes that input edges share the first vertex and are in ascending order of the second
-   * vertex.
-   */
-  @ForwardedFields(Array("v1"))
-  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
-
-    val vertices = mutable.MutableList[Integer]()
-
-    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
-
-      // clear vertex list
-      vertices.clear()
-
-      // build and emit triads
-      for (e <- edges.asScala) {
-        // combine vertex with all previously read vertices
-        for (v <- vertices) {
-          out.collect(Triad(e.v1, v, e.v2))
-        }
-        vertices += e.v2
-      }
-    }
-  }
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 2) {
-        edgePath = args(0)
-        outputPath = args(1)
-
-        true
-      } else {
-        System.err.println("Usage: EnumTriangleOpt <edge path> <result path>")
-
-        false
-      }
-    } else {
-      System.out.println("Executing Enum Triangles Optimized example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
-
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = {
-    if (fileOutput) {
-      env.readCsvFile[Edge](
-        edgePath,
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-    } else {
-      val edges = EnumTrianglesData.EDGES.map {
-        case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])}
-      env.fromCollection(edges)
-    }
-  }
-
-
-  private var fileOutput: Boolean = false
-  private var edgePath: String = null
-  private var outputPath: String = null
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
index c064605..9c4a28d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleBasicITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.test.exampleJavaPrograms;
 
-import org.apache.flink.examples.java.graph.EnumTrianglesBasic;
+import org.apache.flink.examples.java.graph.EnumTriangles;
 import org.apache.flink.test.testdata.EnumTriangleData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
@@ -40,7 +40,7 @@ public class EnumTriangleBasicITCase extends JavaProgramTestBase {
 	
 	@Override
 	protected void testProgram() throws Exception {
-		EnumTrianglesBasic.main(new String[] { edgePath, resultPath });
+		EnumTriangles.main(new String[] { edgePath, resultPath });
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleOptITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleOptITCase.java
deleted file mode 100644
index 7248319..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/EnumTriangleOptITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleJavaPrograms;
-
-import org.apache.flink.examples.java.graph.EnumTrianglesOpt;
-import org.apache.flink.test.testdata.EnumTriangleData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class EnumTriangleOptITCase extends JavaProgramTestBase {
-	
-	protected String edgePath;
-	protected String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
-		resultPath = getTempDirPath("triangles");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_DEGREE, resultPath);
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		EnumTrianglesOpt.main(new String[] { edgePath, resultPath });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
deleted file mode 100644
index 153bda9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-import org.apache.flink.examples.scala.graph.EnumTrianglesBasic;
-import org.apache.flink.test.testdata.EnumTriangleData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class EnumTriangleBasicITCase extends JavaProgramTestBase {
-	
-	protected String edgePath;
-	protected String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
-		resultPath = getTempDirPath("triangles");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		EnumTrianglesBasic.main(new String[] { edgePath, resultPath });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
new file mode 100644
index 0000000..e23cccf
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+import org.apache.flink.examples.scala.graph.EnumTriangles;
+import org.apache.flink.test.testdata.EnumTriangleData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class EnumTriangleITCase extends JavaProgramTestBase {
+	
+	protected String edgePath;
+	protected String resultPath;
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
+		resultPath = getTempDirPath("triangles");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		EnumTriangles.main(new String[] { edgePath, resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7520a53/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
deleted file mode 100644
index 9701086..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.exampleScalaPrograms;
-
-import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
-import org.apache.flink.test.testdata.EnumTriangleData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class EnumTriangleOptITCase extends JavaProgramTestBase {
-	
-	protected String edgePath;
-	protected String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
-		resultPath = getTempDirPath("triangles");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_DEGREE, resultPath);
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		EnumTrianglesOpt.main(new String[] { edgePath, resultPath });
-	}
-
-}


Mime
View raw message