flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [24/60] Renamed java examples package
Date Mon, 22 Sep 2014 12:29:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
new file mode 100644
index 0000000..c3cbc6d
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * <p>
+ * The algorithm works as follows: 
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ *  
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: <code>EnumTriangleBasic &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class EnumTrianglesBasic {
+
+	static boolean fileOutput = false;
+	static String edgePath = null;
+	static String outputPath = null;
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	
+		// read input data
+		DataSet<Edge> edges = getEdgeDataSet(env);
+		
+		// project edges by vertex id
+		DataSet<Edge> edgesById = edges
+				.map(new EdgeByIdProjector());
+		
+		DataSet<Triad> triangles = edgesById
+				// build triads
+				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
+				// filter triads
+				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
+
+		// emit result
+		if(fileOutput) {
+			triangles.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			triangles.print();
+		}
+
+		// execute program
+		env.execute("Basic Triangle Enumeration Example");
+
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/** Converts a Tuple2 into an Edge */
+	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+		private final Edge outEdge = new Edge();
+		
+		@Override
+		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+			outEdge.copyVerticesFromTuple2(t);
+			return outEdge;
+		}
+	}
+	
+	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
+	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
+	
+		@Override
+		public Edge map(Edge inEdge) throws Exception {
+			
+			// flip vertices if necessary
+			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+				inEdge.flipVertices();
+			}
+			
+			return inEdge;
+		}
+	}
+	
+	/**
+	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+		private final List<Integer> vertices = new ArrayList<Integer>();
+		private final Triad outTriad = new Triad();
+		
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+			
+			final Iterator<Edge> edges = edgesIter.iterator();
+			
+			// clear vertex list
+			vertices.clear();
+			
+			// read first edge
+			Edge firstEdge = edges.next();
+			outTriad.setFirstVertex(firstEdge.getFirstVertex());
+			vertices.add(firstEdge.getSecondVertex());
+			
+			// build and emit triads
+			while (edges.hasNext()) {
+				Integer higherVertexId = edges.next().getSecondVertex();
+				
+				// combine vertex with all previously read vertices
+				for (Integer lowerVertexId : vertices) {
+					outTriad.setSecondVertex(lowerVertexId);
+					outTriad.setThirdVertex(higherVertexId);
+					out.collect(outTriad);
+				}
+				vertices.add(higherVertexId);
+			}
+		}
+	}
+	
+	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
+	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
+		
+		@Override
+		public Triad join(Triad triad, Edge edge) throws Exception {
+			return triad;
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean parseParameters(String[] args) {
+	
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				edgePath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgePath)
+						.fieldDelimiter(' ')
+						.includeFields(true, true)
+						.types(Integer.class, Integer.class)
+						.map(new TupleEdgeConverter());
+		} else {
+			return EnumTrianglesData.getDefaultEdgeDataSet(env);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
new file mode 100644
index 0000000..9dcb168
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
+
+/**
+ * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * 
+ * <p>
+ * The basic algorithm works as follows: 
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ * that closes the triangle.
+ * 
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to 
+ * reduce the number of triads. 
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and 
+ * grouping on edges on the vertex with the smaller degree.
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
+ * that include a triangle
+ * </ul>
+ * <pre>
+ *     (1)
+ *     /  \
+ *   (2)-(12)
+ * </pre>
+ * 
+ * Usage: <code>EnumTriangleOpt &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Custom Java objects which extend Tuple
+ * <li>Group Sorting
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class EnumTrianglesOpt {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// read input data
+		DataSet<Edge> edges = getEdgeDataSet(env);
+		
+		// annotate edges with degrees
+		DataSet<EdgeWithDegrees> edgesWithDegrees = edges
+				.flatMap(new EdgeDuplicator())
+				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
+				.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
+		
+		// project edges by degrees
+		DataSet<Edge> edgesByDegree = edgesWithDegrees
+				.map(new EdgeByDegreeProjector());
+		// project edges by vertex id
+		DataSet<Edge> edgesById = edgesByDegree
+				.map(new EdgeByIdProjector());
+		
+		DataSet<Triad> triangles = edgesByDegree
+				// build triads
+				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
+				// filter triads
+				.join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter());
+
+		// emit result
+		if(fileOutput) {
+			triangles.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			triangles.print();
+		}
+		
+		// execute program
+		env.execute("Triangle Enumeration Example");
+		
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/** Converts a Tuple2 into an Edge */
+	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
+		private final Edge outEdge = new Edge();
+		
+		@Override
+		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
+			outEdge.copyVerticesFromTuple2(t);
+			return outEdge;
+		}
+	}
+	
+	/** Emits for an edge the original edge and its switched version. */
+	private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
+		
+		@Override
+		public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
+			out.collect(edge);
+			edge.flipVertices();
+			out.collect(edge);
+		}
+	}
+	
+	/**
+	 * Counts the number of edges that share a common vertex.
+	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
+	 * For each emitted edge, the first vertex is the vertex with the smaller id.
+	 */
+	private static class DegreeCounter implements GroupReduceFunction<Edge, EdgeWithDegrees> {
+		
+		final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
+		final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
+		
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) {
+			
+			Iterator<Edge> edges = edgesIter.iterator();
+			otherVertices.clear();
+			
+			// get first edge
+			Edge edge = edges.next();
+			Integer groupVertex = edge.getFirstVertex();
+			this.otherVertices.add(edge.getSecondVertex());
+			
+			// get all other edges (assumes edges are sorted by second vertex)
+			while (edges.hasNext()) {
+				edge = edges.next();
+				Integer otherVertex = edge.getSecondVertex();
+				// collect unique vertices
+				if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+					this.otherVertices.add(otherVertex);
+				}
+			}
+			int degree = this.otherVertices.size();
+			
+			// emit edges
+			for(Integer otherVertex : this.otherVertices) {
+				if(groupVertex < otherVertex) {
+					outputEdge.setFirstVertex(groupVertex);
+					outputEdge.setFirstDegree(degree);
+					outputEdge.setSecondVertex(otherVertex);
+					outputEdge.setSecondDegree(0);
+				} else {
+					outputEdge.setFirstVertex(otherVertex);
+					outputEdge.setFirstDegree(0);
+					outputEdge.setSecondVertex(groupVertex);
+					outputEdge.setSecondDegree(degree);
+				}
+				out.collect(outputEdge);
+			}
+		}
+	}
+	
+	/**
+	 * Builds an edge with degree annotation from two edges that have the same vertices and only one 
+	 * degree annotation.
+	 */
+	private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> {
+		private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
+		
+		@Override
+		public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees edge2) throws Exception {
+			
+			// copy first edge
+			outEdge.copyFrom(edge1);
+			
+			// set missing degree
+			if(edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
+				outEdge.setFirstDegree(edge2.getFirstDegree());
+			} else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
+				outEdge.setSecondDegree(edge2.getSecondDegree());
+			}
+			return outEdge;
+		}
+	}
+		
+	/** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */
+	private static class EdgeByDegreeProjector implements MapFunction<EdgeWithDegrees, Edge> {
+		
+		private final Edge outEdge = new Edge();
+		
+		@Override
+		public Edge map(EdgeWithDegrees inEdge) throws Exception {
+
+			// copy vertices to simple edge
+			outEdge.copyVerticesFromEdgeWithDegrees(inEdge);
+
+			// flip vertices if first degree is larger than second degree.
+			if(inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
+				outEdge.flipVertices();
+			}
+
+			// return edge
+			return outEdge;
+		}
+	}
+	
+	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
+	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
+	
+		@Override
+		public Edge map(Edge inEdge) throws Exception {
+			
+			// flip vertices if necessary
+			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+				inEdge.flipVertices();
+			}
+			
+			return inEdge;
+		}
+	}
+	
+	/**
+	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
+		
+		private final List<Integer> vertices = new ArrayList<Integer>();
+		private final Triad outTriad = new Triad();
+		
+		@Override
+		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
+			final Iterator<Edge> edges = edgesIter.iterator();
+			
+			// clear vertex list
+			vertices.clear();
+			
+			// read first edge
+			Edge firstEdge = edges.next();
+			outTriad.setFirstVertex(firstEdge.getFirstVertex());
+			vertices.add(firstEdge.getSecondVertex());
+			
+			// build and emit triads
+			while (edges.hasNext()) {
+				Integer higherVertexId = edges.next().getSecondVertex();
+				
+				// combine vertex with all previously read vertices
+				for(Integer lowerVertexId : vertices) {
+					outTriad.setSecondVertex(lowerVertexId);
+					outTriad.setThirdVertex(higherVertexId);
+					out.collect(outTriad);
+				}
+				vertices.add(higherVertexId);
+			}
+		}
+	}
+	
+	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
+	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
+		
+		@Override
+		public Triad join(Triad triad, Edge edge) throws Exception {
+			return triad;
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String edgePath = null;
+	private static String outputPath = null;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				edgePath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: EnumTriangleBasic <edge path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Enum Triangles Opt example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: EnumTriangleOpt <edge path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgePath)
+						.fieldDelimiter(' ')
+						.includeFields(true, true)
+						.types(Integer.class, Integer.class)
+						.map(new TupleEdgeConverter());
+		} else {
+			return EnumTrianglesData.getDefaultEdgeDataSet(env);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
new file mode 100644
index 0000000..4a491b4
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.graph;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.examples.java.graph.util.PageRankData;
+
+/**
+ * A basic implementation of the Page Rank algorithm using a bulk iteration.
+ * 
+ * <p>
+ * This implementation requires a set of pages and a set of directed links as input and works as follows. <br> 
+ * In each iteration, the rank of every page is evenly distributed to all pages it points to.
+ * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
+ * The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
+ * This implementation terminates after a fixed number of iterations.<br>
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>. 
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br> 
+ * For example <code>"1\n2\n12\n42\n63\n"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
+ * <li>Links are represented as pairs of page IDs which are separated by space 
+ * characters. Links are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br>
+ * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>PageRankBasic &lt;pages path&gt; &lt;links path&gt; &lt;output path&gt; &lt;num pages&gt; &lt;num iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations.
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk Iterations
+ * <li>Default Join
+ * <li>Configure user-defined functions using constructor parameters.
+ * </ul> 
+ * 
+ *
+ */
+@SuppressWarnings("serial")
+public class PageRankBasic {
+	
+	private static final double DAMPENING_FACTOR = 0.85;
+	private static final double EPSILON = 0.0001;
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<Long> pagesInput = getPagesDataSet(env);
+		DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
+		
+		// assign initial rank to pages
+		DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+				map(new RankAssigner((1.0d / numPages)));
+		
+		// build adjacency list from link input
+		DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
+				linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+		
+		// set iterative data set
+		IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
+		
+		DataSet<Tuple2<Long, Double>> newRanks = iteration
+				// join pages with outgoing edges and distribute rank
+				.join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
+				// collect and sum ranks
+				.groupBy(0).aggregate(SUM, 1)
+				// apply dampening factor
+				.map(new Dampener(DAMPENING_FACTOR, numPages));
+		
+		DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+				newRanks, 
+				newRanks.join(iteration).where(0).equalTo(0)
+				// termination condition
+				.filter(new EpsilonFilter()));
+
+		// emit result
+		if(fileOutput) {
+			finalPageRanks.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			finalPageRanks.print();
+		}
+
+		// execute program
+		env.execute("Basic Page Rank Example");
+		
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/** 
+	 * A map function that assigns an initial rank to all pages. 
+	 */
+	public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
+		Tuple2<Long, Double> outPageWithRank;
+		
+		public RankAssigner(double rank) {
+			this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
+		}
+		
+		@Override
+		public Tuple2<Long, Double> map(Long page) {
+			outPageWithRank.f0 = page;
+			return outPageWithRank;
+		}
+	}
+	
+	/**
+	 * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
+	 * originate. Run as a pre-processing step.
+	 */
+	@ConstantFields("0")
+	public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+		
+		private final ArrayList<Long> neighbors = new ArrayList<Long>();
+		
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+			neighbors.clear();
+			Long id = 0L;
+			
+			for (Tuple2<Long, Long> n : values) {
+				id = n.f0;
+				neighbors.add(n.f1);
+			}
+			out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+		}
+	}
+	
+	/**
+	 * Join function that distributes a fraction of a vertex's rank to all neighbors.
+	 */
+	public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
+
+		@Override
+		public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
+			Long[] neigbors = value.f1.f1;
+			double rank = value.f0.f1;
+			double rankToDistribute = rank / ((double) neigbors.length);
+				
+			for (int i = 0; i < neigbors.length; i++) {
+				out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
+			}
+		}
+	}
+	
+	/**
+	 * The function that applies the page rank dampening formula
+	 */
+	@ConstantFields("0")
+	public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+
+		private final double dampening;
+		private final double randomJump;
+		
+		public Dampener(double dampening, double numVertices) {
+			this.dampening = dampening;
+			this.randomJump = (1 - dampening) / numVertices;
+		}
+
+		@Override
+		public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
+			value.f1 = (value.f1 * dampening) + randomJump;
+			return value;
+		}
+	}
+	
+	/**
+	 * Filter that filters vertices where the rank difference is below a threshold.
+	 */
+	public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+
+		@Override
+		public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
+			return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String pagesInputPath = null;
+	private static String linksInputPath = null;
+	private static String outputPath = null;
+	private static long numPages = 0;
+	private static int maxIterations = 10;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			if(args.length == 5) {
+				fileOutput = true;
+				pagesInputPath = args[0];
+				linksInputPath = args[1];
+				outputPath = args[2];
+				numPages = Integer.parseInt(args[3]);
+				maxIterations = Integer.parseInt(args[4]);
+			} else {
+				System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+			
+			numPages = PageRankData.getNumberOfPages();
+		}
+		return true;
+	}
+	
+	private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+						.readCsvFile(pagesInputPath)
+							.fieldDelimiter(' ')
+							.lineDelimiter("\n")
+							.types(Long.class)
+						.map(new MapFunction<Tuple1<Long>, Long>() {
+							@Override
+							public Long map(Tuple1<Long> v) { return v.f0; }
+						});
+		} else {
+			return PageRankData.getDefaultPagesDataSet(env);
+		}
+	}
+	
+	private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(linksInputPath)
+						.fieldDelimiter(' ')
+						.lineDelimiter("\n")
+						.types(Long.class, Long.class);
+		} else {
+			return PageRankData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
new file mode 100644
index 0000000..30230d6
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@SuppressWarnings("serial")
+public class TransitiveClosureNaive implements ProgramDescription {
+
+
+	public static void main (String... args) throws Exception{
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+		IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+
+		DataSet<Tuple2<Long,Long>> nextPaths = paths
+				.join(edges)
+				.where(1)
+				.equalTo(0)
+				.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					/**
+						left: Path (z,x) - x is reachable by z
+						right: Edge (x,y) - edge x-->y exists
+						out: Path (z,y) - y is reachable by z
+					 */
+					public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+						return new Tuple2<Long, Long>(
+								new Long(left.f0),
+								new Long(right.f1));
+					}
+				})
+				.union(paths)
+				.groupBy(0, 1)
+				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+						out.collect(values.iterator().next());
+					}
+				});
+
+		DataSet<Tuple2<Long,Long>> newPaths = paths
+				.coGroup(nextPaths)
+				.where(0).equalTo(0)
+				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					Set prevSet = new HashSet<Tuple2<Long,Long>>();
+					@Override
+					public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
+						for (Tuple2<Long,Long> prev : prevPaths) {
+							prevSet.add(prev);
+						}
+						for (Tuple2<Long,Long> next: nextPaths) {
+							if (!prevSet.contains(next)) {
+								out.collect(next);
+							}
+						}
+					}
+				});
+
+		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+
+
+		// emit result
+		if (fileOutput) {
+			transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			transitiveClosure.print();
+		}
+
+		// execute program
+		env.execute("Transitive Closure Example");
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if (programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (programArguments.length == 3) {
+				edgesPath = programArguments[0];
+				outputPath = programArguments[1];
+				maxIterations = Integer.parseInt(programArguments[2]);
+			} else {
+				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+		}
+		return true;
+	}
+
+
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
new file mode 100644
index 0000000..27c7d45
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the Connected Components example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class ConnectedComponentsData {
+	
+	public static final Object[][] VERTICES  = new Object[][] {
+		new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L}, 
+		new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L}, 
+		new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L}, 
+		new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L}
+	};
+
+	public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
+		List<Long> verticesList = new LinkedList<Long>();
+		for (Object[] vertex : VERTICES) {
+			verticesList.add((Long) vertex[0]);
+		}
+		return env.fromCollection(verticesList);
+	}
+	
+	public static final Object[][] EDGES = new Object[][] {
+		new Object[]{1L, 2L},
+		new Object[]{2L, 3L},
+		new Object[]{2L, 4L},
+		new Object[]{3L, 5L},
+		new Object[]{6L, 7L},
+		new Object[]{8L, 9L},
+		new Object[]{8L, 10L},
+		new Object[]{5L, 11L},
+		new Object[]{11L, 12L},
+		new Object[]{10L, 13L},
+		new Object[]{9L, 14L},
+		new Object[]{13L, 14L},
+		new Object[]{1L, 15L},
+		new Object[]{16L, 1L}
+	};
+	
+	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
+		for (Object[] edge : EDGES) {
+			edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
+		}
+		return env.fromCollection(edgeList);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
new file mode 100644
index 0000000..257b7d8
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+
+/**
+ * Provides the default data sets used for the Triangle Enumeration example programs.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class EnumTrianglesData {
+
+	public static final Object[][] EDGES = {
+		{1, 2},
+		{1, 3},
+		{1 ,4},
+		{1, 5},
+		{2, 3},
+		{2, 5},
+		{3, 4},
+		{3, 7},
+		{3, 8},
+		{5, 6},
+		{7, 8}
+	};
+	
+	public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Edge> edges = new ArrayList<Edge>();
+		for(Object[] e : EDGES) {
+			edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+		}
+		
+		return env.fromCollection(edges);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
new file mode 100644
index 0000000..d7eefae
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+public class EnumTrianglesDataTypes {
+
+	public static class Edge extends Tuple2<Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+		
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		
+		public Edge() {}
+		
+		public Edge(final Integer v1, final Integer v2) {
+			this.setFirstVertex(v1);
+			this.setSecondVertex(v2);
+		}
+		
+		public Integer getFirstVertex() { return this.getField(V1); }
+		
+		public Integer getSecondVertex() { return this.getField(V2); }
+		
+		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+		
+		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+		
+		public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
+			this.setFirstVertex(t.f0);
+			this.setSecondVertex(t.f1);
+		}
+		
+		public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) {
+			this.setFirstVertex(ewd.getFirstVertex());
+			this.setSecondVertex(ewd.getSecondVertex());
+		}
+		
+		public void flipVertices() {
+			Integer tmp = this.getFirstVertex();
+			this.setFirstVertex(this.getSecondVertex());
+			this.setSecondVertex(tmp);
+		}
+	}
+	
+	public static class Triad extends Tuple3<Integer, Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+		
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		public static final int V3 = 2;
+		
+		public Triad() {}
+		
+		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+		
+		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+		
+		public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+	}
+	
+	public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		public static final int D1 = 2;
+		public static final int D2 = 3;
+		
+		public EdgeWithDegrees() { }
+			
+		public Integer getFirstVertex() { return this.getField(V1); }
+		
+		public Integer getSecondVertex() { return this.getField(V2); }
+		
+		public Integer getFirstDegree() { return this.getField(D1); }
+		
+		public Integer getSecondDegree() { return this.getField(D2); }
+		
+		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
+		
+		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
+		
+		public void setFirstDegree(final Integer degree1) { this.setField(degree1, D1); }
+		
+		public void setSecondDegree(final Integer degree2) { this.setField(degree2, D2); }
+		
+		public void copyFrom(final EdgeWithDegrees edge) {
+			this.setFirstVertex(edge.getFirstVertex());
+			this.setSecondVertex(edge.getSecondVertex());
+			this.setFirstDegree(edge.getFirstDegree());
+			this.setSecondDegree(edge.getSecondDegree());
+		}
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
new file mode 100644
index 0000000..d6ee274
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Provides the default data sets used for the PageRank example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class PageRankData {
+
+	public static final Object[][] EDGES = {
+		{1L, 2L},
+		{1L, 15L},
+		{2L, 3L},
+		{2L, 4L},
+		{2L, 5L},
+		{2L, 6L},
+		{2L, 7L},
+		{3L, 13L},
+		{4L, 2L},
+		{5L, 11L},
+		{5L, 12L},
+		{6L, 1L},
+		{6L, 7L},
+		{6L, 8L},
+		{7L, 1L},
+		{7L, 8L},
+		{8L, 1L},
+		{8L, 9L},
+		{8L, 10L},
+		{9L, 14L},
+		{9L, 1L},
+		{10L, 1L},
+		{10L, 13L},
+		{11L, 12L},
+		{11L, 1L},
+		{12L, 1L},
+		{13L, 14L},
+		{14L, 12L},
+		{15L, 1L},
+	};
+	
+	private static long numPages = 15;
+	
+	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
+		for(Object[] e : EDGES) {
+			edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
+		}
+		return env.fromCollection(edges);
+	}
+	
+	public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
+		return env.generateSequence(1, 15);
+	}
+	
+	public static long getNumberOfPages() {
+		return numPages;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
new file mode 100644
index 0000000..0d4fd3e
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/** 
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle 
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ * 
+ * Thus Pi = 4 * (area of circle / area of square).
+ * 
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * and then counting the number of points that fall within the circle
+ * 
+ * <pre>
+ * {@code
+ * x = Math.random()
+ * y = Math.random()
+ * 
+ * x * x + y * y < 1
+ * }
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class PiEstimation implements java.io.Serializable {
+	
+	
+	public static void main(String[] args) throws Exception {
+
+		final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// count how many of the samples would randomly fall into
+		// the unit circle
+		DataSet<Long> count = 
+				env.generateSequence(1, numSamples)
+				.map(new Sampler())
+				.reduce(new SumReducer());
+
+		// the ratio of the unit circle surface to 4 times the unit square is pi
+		DataSet<Double> pi = count
+				.map(new MapFunction<Long, Double>() {
+					public Double map(Long value) {
+						return value * 4.0 / numSamples;
+					}
+				});
+
+		System.out.println("We estimate Pi to be:");
+		pi.print();
+
+		env.execute();
+	}
+
+	//*************************************************************************
+	//     USER FUNCTIONS
+	//*************************************************************************
+	
+	
+	/** 
+	 * Sampler randomly emits points that fall within a square of edge x * y.
+	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+	 * If the distance is less than 1, then and only then does it returns a 1.
+	 */
+	public static class Sampler implements MapFunction<Long, Long> {
+
+		@Override
+		public Long map(Long value) throws Exception{
+			double x = Math.random();
+			double y = Math.random();
+			return (x * x + y * y) < 1 ? 1L : 0L;
+		}
+	}
+
+	
+	/** 
+	 * Simply sums up all long values.
+	 */
+	public static final class SumReducer implements ReduceFunction<Long>{
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
new file mode 100644
index 0000000..7940310
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.ml;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.ml.util.LinearRegressionData;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+/**
+ * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
+ *
+ * <p>
+ * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
+ * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
+ * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
+ * The algorithm terminates after a fixed number of iterations (as in this implementation)
+ * With enough iteration, the algorithm can minimize the cost function and find the best parameters
+ * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
+ * 
+ * <p>
+ * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
+ * It find the best Theta parameter to fit the target.
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
+ * Data points are separated by newline characters.<br>
+ * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
+ * </ul>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> Bulk iterations
+ * <li> Broadcast variables in bulk iterations
+ * <li> Custom Java objects (PoJos)
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class LinearRegression {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception{
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get input x data from elements
+		DataSet<Data> data = getDataSet(env);
+
+		// get the parameters from elements
+		DataSet<Params> parameters = getParamsDataSet(env);
+
+		// set number of bulk iterations for SGD linear Regression
+		IterativeDataSet<Params> loop = parameters.iterate(numIterations);
+
+		DataSet<Params> new_parameters = data
+				// compute a single step using every sample
+				.map(new SubUpdate()).withBroadcastSet(loop, "parameters")
+				// sum up all the steps
+				.reduce(new UpdateAccumulator())
+				// average the steps and update all parameters
+				.map(new Update());
+
+		// feed new parameters back into next iteration
+		DataSet<Params> result = loop.closeWith(new_parameters);
+
+		// emit result
+		if(fileOutput) {
+			result.writeAsText(outputPath);
+		} else {
+			result.print();
+		}
+
+		// execute program
+		env.execute("Linear Regression example");
+
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * A simple data sample, x means the input, and y means the target.
+	 */
+	public static class Data implements Serializable{
+		public double x,y;
+
+		public Data() {};
+
+		public Data(double x ,double y){
+			this.x = x;
+			this.y = y;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + x + "|" + y + ")";
+		}
+
+	}
+
+	/**
+	 * A set of parameters -- theta0, theta1.
+	 */
+	public static class Params implements Serializable{
+
+		private double theta0,theta1;
+
+		public Params(){};
+
+		public Params(double x0, double x1){
+			this.theta0 = x0;
+			this.theta1 = x1;
+		}
+
+		@Override
+		public String toString() {
+			return theta0 + " " + theta1;
+		}
+
+		public double getTheta0() {
+			return theta0;
+		}
+
+		public double getTheta1() {
+			return theta1;
+		}
+
+		public void setTheta0(double theta0) {
+			this.theta0 = theta0;
+		}
+
+		public void setTheta1(double theta1) {
+			this.theta1 = theta1;
+		}
+
+		public Params div(Integer a){
+			this.theta0 = theta0 / a ;
+			this.theta1 = theta1 / a ;
+			return this;
+		}
+
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/** Converts a Tuple2<Double,Double> into a Data. */
+	public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> {
+
+		@Override
+		public Data map(Tuple2<Double, Double> t) throws Exception {
+			return new Data(t.f0, t.f1);
+		}
+	}
+
+	/** Converts a Tuple2<Double,Double> into a Params. */
+	public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> {
+
+		@Override
+		public Params map(Tuple2<Double, Double> t)throws Exception {
+			return new Params(t.f0,t.f1);
+		}
+	}
+
+	/**
+	 * Compute a single BGD type update for every parameters.
+	 */
+	public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
+
+		private Collection<Params> parameters; 
+
+		private Params parameter;
+
+		private int count = 1;
+
+		/** Reads the parameters from a broadcast variable into a collection. */
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			this.parameters = getRuntimeContext().getBroadcastVariable("parameters");
+		}
+
+		@Override
+		public Tuple2<Params, Integer> map(Data in) throws Exception {
+
+			for(Params p : parameters){
+				this.parameter = p; 
+			}
+
+			double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
+			double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
+
+			return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
+		}
+	}
+
+	/**  
+	 * Accumulator all the update.
+	 * */
+	public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
+
+		@Override
+		public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
+
+			double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
+			double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
+			Params result = new Params(new_theta0,new_theta1);
+			return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
+
+		}
+	}
+
+	/**
+	 * Compute the final update by average them.
+	 */
+	public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
+
+		@Override
+		public Params map(Tuple2<Params, Integer> arg0) throws Exception {
+
+			return arg0.f0.div(arg0.f1);
+
+		}
+
+	}
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String dataPath = null;
+	private static String outputPath = null;
+	private static int numIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if(programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(programArguments.length == 3) {
+				dataPath = programArguments[0];
+				outputPath = programArguments[1];
+				numIterations = Integer.parseInt(programArguments[2]);
+			} else {
+				System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Linear Regression example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  We provide a data generator to create synthetic input files for this program.");
+			System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Data> getDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read data from CSV file
+			return env.readCsvFile(dataPath)
+					.fieldDelimiter(' ')
+					.includeFields(true, true)
+					.types(Double.class, Double.class)
+					.map(new TupleDataConverter());
+		} else {
+			return LinearRegressionData.getDefaultDataDataSet(env);
+		}
+	}
+
+	private static DataSet<Params> getParamsDataSet(ExecutionEnvironment env) {
+
+		return LinearRegressionData.getDefaultParamsDataSet(env);
+
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
new file mode 100644
index 0000000..84d332f
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.ml.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.ml.LinearRegression.Data;
+import org.apache.flink.examples.java.ml.LinearRegression.Params;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Linear Regression example
+ * program. The default data sets are used, if no parameters are given to the
+ * program.
+ */
+public class LinearRegressionData {
+
+	// We have the data as object arrays so that we can also generate Scala Data
+	// Sources from it.
+	public static final Object[][] PARAMS = new Object[][] { new Object[] {
+			0.0, 0.0 } };
+
+	public static final Object[][] DATA = new Object[][] {
+			new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
+			new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
+			new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
+			new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
+			new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
+			new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 },
+			new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 },
+			new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 },
+			new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 },
+			new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
+			new Object[] { -0.55, -1.08 } };
+
+	public static DataSet<Params> getDefaultParamsDataSet(
+			ExecutionEnvironment env) {
+		List<Params> paramsList = new LinkedList<Params>();
+		for (Object[] params : PARAMS) {
+			paramsList.add(new Params((Double) params[0], (Double) params[1]));
+		}
+		return env.fromCollection(paramsList);
+	}
+
+	public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
+
+		List<Data> dataList = new LinkedList<Data>();
+		for (Object[] data : DATA) {
+			dataList.add(new Data((Double) data[0], (Double) data[1]));
+		}
+		return env.fromCollection(dataList);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
new file mode 100644
index 0000000..148c607
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.ml.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Locale;
+import java.util.Random;
+
+/**
+ * Generates data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
+ */
+public class LinearRegressionDataGenerator {
+
+	static {
+		Locale.setDefault(Locale.US);
+	}
+
+	private static final String POINTS_FILE = "data";
+	private static final long DEFAULT_SEED = 4650285087650871364L;
+	private static final int DIMENSIONALITY = 1;
+	private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
+	private static final char DELIMITER = ' ';
+
+	/**
+	 * Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
+	 * <p>
+	 * The generator creates to files:
+	 * <ul>
+	 * <li><code>{tmp.dir}/data</code> for the data points
+	 * </ul> 
+	 * 
+	 * @param args 
+	 * <ol>
+	 * <li>Int: Number of data points
+	 * <li><b>Optional</b> Long: Random seed
+	 * </ol>
+	 */
+	public static void main(String[] args) throws IOException {
+
+		// check parameter count
+		if (args.length < 1) {
+			System.out.println("LinearRegressionDataGenerator <numberOfDataPoints> [<seed>]");
+			System.exit(1);
+		}
+
+		// parse parameters
+		final int numDataPoints = Integer.parseInt(args[0]);
+		final long firstSeed = args.length > 1 ? Long.parseLong(args[4]) : DEFAULT_SEED;
+		final Random random = new Random(firstSeed);
+		final String tmpDir = System.getProperty("java.io.tmpdir");
+
+		// write the points out
+		BufferedWriter pointsOut = null;
+		try {
+			pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
+			StringBuilder buffer = new StringBuilder();
+
+			// DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
+			double[] point = new double[DIMENSIONALITY+1];
+
+			for (int i = 1; i <= numDataPoints; i++) {
+				point[0] = random.nextGaussian();
+				point[1] = 2 * point[0] + 0.01*random.nextGaussian();
+				writePoint(point, buffer, pointsOut);
+			}
+
+		}
+		finally {
+			if (pointsOut != null) {
+				pointsOut.close();
+			}
+		}
+
+		System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
+	}
+
+
+	private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
+		buffer.setLength(0);
+
+		// write coordinates
+		for (int j = 0; j < data.length; j++) {
+			buffer.append(FORMAT.format(data[j]));
+			if(j < data.length - 1) {
+				buffer.append(DELIMITER);
+			}
+		}
+
+		out.write(buffer.toString());
+		out.newLine();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
new file mode 100644
index 0000000..6faafe0
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+/**
+ * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
+ * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
+ * most contain whitespace characters like space and tab.
+ * <p>
+ * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
+ * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
+ * <p>
+ * Usage: <code>FilterAndCountIncompleteLines [&lt;input file path&gt; [&lt;result path&gt;]]</code> <br>
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>custom accumulators
+ * <li>tuple data types
+ * <li>inline-defined functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class EmptyFieldsCountAccumulator {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get the data set
+		final DataSet<Tuple> file = getDataSet(env);
+
+		// filter lines with empty fields
+		final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
+
+		// Here, we could do further processing with the filtered lines...
+		
+		// output the filtered lines
+		if (outputPath == null) {
+			filteredLines.print();
+		} else {
+			filteredLines.writeAsCsv(outputPath);
+		}
+
+		// execute program
+		final JobExecutionResult result = env.execute("Accumulator example");
+
+		// get the accumulator result via its registration key
+		final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
+		System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static String filePath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] programArguments) {
+
+		if (programArguments.length >= 3) {
+			System.err.println("Usage: FilterAndCountIncompleteLines [<input file path> [<result path>]]");
+			return false;
+		}
+
+		if (programArguments.length >= 1) {
+			filePath = programArguments[0];
+			if (programArguments.length == 2) {
+				outputPath = programArguments[1];
+			}
+		}
+
+		return true;
+	}
+
+	@SuppressWarnings("unchecked")
+	private static DataSet<Tuple> getDataSet(final ExecutionEnvironment env) {
+
+		DataSet<? extends Tuple> source;
+		if (filePath == null) {
+			source = env.fromCollection(getExampleInputTuples());
+
+		} else {
+			source = env
+					.readCsvFile(filePath)
+					.fieldDelimiter(';')
+					.types(String.class, String.class, String.class);
+
+		}
+
+		return (DataSet<Tuple>) source;
+	}
+
+	private static Collection<Tuple3<String, String, String>> getExampleInputTuples() {
+		Collection<Tuple3<String, String, String>> inputTuples = new ArrayList<Tuple3<String, String, String>>();
+		inputTuples.add(new Tuple3<String, String, String>("John", "Doe", "Foo Str."));
+		inputTuples.add(new Tuple3<String, String, String>("Joe", "Johnson", ""));
+		inputTuples.add(new Tuple3<String, String, String>(null, "Kate Morn", "Bar Blvd."));
+		inputTuples.add(new Tuple3<String, String, String>("Tim", "Rinny", ""));
+		inputTuples.add(new Tuple3<String, String, String>("Alicia", "Jackson", "  "));
+		return inputTuples;
+	}
+
+	/**
+	 * This function filters all incoming tuples that have one or more empty fields.
+	 * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under 
+	 * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
+	 */
+	public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> {
+
+		// create a new accumulator in each filter function instance
+		// accumulators can be merged later on
+		private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();
+
+		@Override
+		public void open(final Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			// register the accumulator instance
+			getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
+					this.emptyFieldCounter);
+		}
+
+		@Override
+		public boolean filter(final Tuple t) {
+			boolean containsEmptyFields = false;
+
+			// iterate over the tuple fields looking for empty ones
+			for (int pos = 0; pos < t.getArity(); pos++) {
+
+				final String field = t.getField(pos);
+				if (field == null || field.trim().isEmpty()) {
+					containsEmptyFields = true;
+
+					// if an empty field is encountered, update the
+					// accumulator
+					this.emptyFieldCounter.add(pos);
+				}
+			}
+
+			return !containsEmptyFields;
+		}
+	}
+
+	/**
+	 * This accumulator lets you increase vector components distributedly. The {@link #add(Integer)} method lets you
+	 * increase the <i>n</i>-th vector component by 1, whereat <i>n</i> is the methods parameter. The size of the vector
+	 * is automatically managed.
+	 */
+	public static class VectorAccumulator implements Accumulator<Integer, List<Integer>> {
+
+		/** Stores the accumulated vector components. */
+		private final List<Integer> resultVector = new ArrayList<Integer>();
+
+		/**
+		 * Increases the result vector component at the specified position by 1.
+		 */
+		@Override
+		public void add(final Integer position) {
+			updateResultVector(position, 1);
+		}
+
+		/**
+		 * Increases the result vector component at the specified position by the specified delta.
+		 */
+		private void updateResultVector(final int position, final int delta) {
+			// inflate the vector to contain the given position
+			while (this.resultVector.size() <= position) {
+				this.resultVector.add(0);
+			}
+
+			// increment the component value
+			final int component = this.resultVector.get(position);
+			this.resultVector.set(position, component + delta);
+		}
+
+		@Override
+		public List<Integer> getLocalValue() {
+			return this.resultVector;
+		}
+
+		@Override
+		public void resetLocal() {
+			// clear the result vector if the accumulator instance shall be reused
+			this.resultVector.clear();
+		}
+
+		@Override
+		public void merge(final Accumulator<Integer, List<Integer>> other) {
+			// merge two vector accumulators by adding their up their vector components
+			final List<Integer> otherVector = other.getLocalValue();
+			for (int index = 0; index < otherVector.size(); index++) {
+				updateResultVector(index, otherVector.get(index));
+			}
+		}
+
+		@Override
+		public void write(final DataOutputView out) throws IOException {
+			// binary serialization of the result vector:
+			// [number of components, component 0, component 1, ...]
+			out.writeInt(this.resultVector.size());
+			for (final Integer component : this.resultVector) {
+				out.writeInt(component);
+			}
+		}
+
+		@Override
+		public void read(final DataInputView in) throws IOException {
+			// binary deserialization of the result vector
+			final int size = in.readInt();
+			for (int numReadComponents = 0; numReadComponents < size; numReadComponents++) {
+				final int component = in.readInt();
+				this.resultVector.add(component);
+			}
+		}
+
+	}
+}


Mime
View raw message