flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [04/10] flink git commit: [gelly] [refactoring] Removed Example end string from all gelly examples
Date Tue, 19 May 2015 21:03:33 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
deleted file mode 100644
index c43dbaa..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.LabelPropagation;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses the label propagation algorithm to detect communities by
- * propagating labels. Initially, each vertex is assigned its id as its label.
- * The vertices iteratively propagate their labels to their neighbors and adopt
- * the most frequent label among their neighbors. The algorithm converges when
- * no vertex changes value or the maximum number of iterations have been
- * reached.
- *
- * The edges input file is expected to contain one edge per line, with long IDs
- * in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * The vertices input file is expected to contain one vertex per line, with long IDs
- * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
- *
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- */
-public class LabelPropagationExample implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// Set up the execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// Set up the graph
-		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
-		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges,	env);
-
-		// Set up the program
-		DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
-				new LabelPropagation<Long>(maxIterations)).getVertices();
-
-		// Emit results
-		if(fileOutput) {
-			verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			verticesWithCommunity.print();
-		}
-
-		// Execute the program
-		env.execute("Label Propagation Example");
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static long numVertices = 100;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-			System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(vertexInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new Tuple2ToVertexMap<Long, Long>());
-		}
-
-		return env.generateSequence(1, numVertices).map(
-				new MapFunction<Long, Vertex<Long, Long>>() {
-					public Vertex<Long, Long> map(Long l) throws Exception {
-						return new Vertex<Long, Long>(l, l);
-					}
-				});
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		}
-
-		return env.generateSequence(1, numVertices).flatMap(
-				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-					@Override
-					public void flatMap(Long key,
-							Collector<Edge<Long, NullValue>> out) {
-						int numOutEdges = (int) (Math.random() * (numVertices / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numVertices) + 1;
-							out.collect(new Edge<Long, NullValue>(key, target,
-									NullValue.getInstance()));
-						}
-					}
-				});
-	}
-
-	@Override
-	public String getDescription() {
-		return "Label Propagation Example";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index e8871eb..dbf26e9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -37,7 +37,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.MusicProfilesData;
-import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.graph.library.LabelPropagationAlgorithm;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -140,7 +140,7 @@ public class MusicProfiles implements ProgramDescription {
 							public Long map(Tuple2<Long, Long> value) {
 								return value.f1;
 							}
-						}).run(new LabelPropagation<String>(maxIterations))
+						}).run(new LabelPropagationAlgorithm<String>(maxIterations))
 				.getVertices();
 
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
new file mode 100644
index 0000000..68d59ad
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.PageRankAlgorithm;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs and double
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 10 vertices
+ * and random edge weights.
+ *
+ */
+public class PageRank implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+
+		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
+
+			public Double map(Long value) throws Exception {
+				return 1.0;
+			}
+		}, env);
+
+		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+
+		// assign the transition probabilities as the edge weights
+		Graph<Long, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees,
+						new MapFunction<Tuple2<Double, Long>, Double>() {
+							public Double map(Tuple2<Double, Long> value) {
+								return value.f0 / value.f1;
+							}
+						});
+
+		DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run(
+				new PageRankAlgorithm<Long>(DAMPENING_FACTOR, maxIterations))
+				.getVertices();
+
+		if (fileOutput) {
+			pageRanks.writeAsCsv(outputPath, "\n", "\t");
+		} else {
+			pageRanks.print();
+		}
+
+		env.execute();
+	}
+
+	@Override
+	public String getDescription() {
+		return "PageRank example";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static final double DAMPENING_FACTOR = 0.85;
+	private static long numPages = 10;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 3) {
+				System.err.println("Usage: PageRank <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+		} else {
+			System.out.println("Executing PageRank example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRank <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		}
+
+		return env.generateSequence(1, numPages).flatMap(
+				new FlatMapFunction<Long, Edge<Long, Double>>() {
+					@Override
+					public void flatMap(Long key,
+							Collector<Edge<Long, Double>> out) throws Exception {
+						int numOutEdges = (int) (Math.random() * (numPages / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numPages) + 1;
+							out.collect(new Edge<Long, Double>(key, target, 1.0));
+						}
+					}
+				});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
deleted file mode 100644
index 2e36ad9..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.PageRank;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.util.Collector;
-
-/**
- * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and double
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices
- * and random edge weights.
- *
- */
-public class PageRankExample implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
-
-		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
-			public Double map(Long value) throws Exception {
-				return 1.0;
-			}
-		}, env);
-
-		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
-		// assign the transition probabilities as the edge weights
-		Graph<Long, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees,
-						new MapFunction<Tuple2<Double, Long>, Double>() {
-							public Double map(Tuple2<Double, Long> value) {
-								return value.f0 / value.f1;
-							}
-						});
-
-		DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run(
-				new PageRank<Long>(DAMPENING_FACTOR, maxIterations))
-				.getVertices();
-
-		if (fileOutput) {
-			pageRanks.writeAsCsv(outputPath, "\n", "\t");
-		} else {
-			pageRanks.print();
-		}
-
-		env.execute();
-	}
-
-	@Override
-	public String getDescription() {
-		return "PageRank example";
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static final double DAMPENING_FACTOR = 0.85;
-	private static long numPages = 10;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage: PageRank <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-		} else {
-			System.out.println("Executing PageRank example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: PageRank <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		}
-
-		return env.generateSequence(1, numPages).flatMap(
-				new FlatMapFunction<Long, Edge<Long, Double>>() {
-					@Override
-					public void flatMap(Long key,
-							Collector<Edge<Long, Double>> out) throws Exception {
-						int numOutEdges = (int) (Math.random() * (numPages / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numPages) + 1;
-							out.collect(new Edge<Long, Double>(key, target, 1.0));
-						}
-					}
-				});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
deleted file mode 100644
index f6cc9bf..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
-import org.apache.flink.graph.library.SimpleCommunityDetection;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use the {@link org.apache.flink.graph.library.SimpleCommunityDetection}
- * library method:
- * <ul>
- * 	<li> with the edge data set given as a parameter
- * 	<li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
- * 1-2 with weight 1.0 and 1-3 with weight 2.0.
- *
- * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
- */
-public class SimpleCommunityDetectionExample implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// set up the graph
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-		Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
-				new MapFunction<Long, Long>() {
-					@Override
-					public Long map(Long label) throws Exception {
-						return label;
-					}
-				}, env);
-
-		// the result is in the form of <vertexId, communityId>, where the communityId is the label
-		// which the vertex converged to
-		DataSet<Vertex<Long, Long>> communityVertices =
-				graph.run(new SimpleCommunityDetection(maxIterations, delta)).getVertices();
-
-		// emit result
-		if (fileOutput) {
-			communityVertices.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			communityVertices.print();
-		}
-
-		env.execute("Executing Simple Community Detection Example");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Simple Community Detection Example";
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static Integer maxIterations = SimpleCommunityDetectionData.MAX_ITERATIONS;
-	private static Double delta = SimpleCommunityDetectionData.DELTA;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage SimpleCommunityDetection <edge path> <output path> " +
-						"<num iterations> <delta>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-			delta = Double.parseDouble(args[3]);
-
-		} else {
-			System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage SimpleCommunityDetection <edge path> <output path> " +
-					"<num iterations> <delta>");
-		}
-
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			return SimpleCommunityDetectionData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..ae43e4c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.SingleSourceShortestPathsAlgorithm;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example implements the Single Source Shortest Paths algorithm,
+ * using a vertex-centric iteration.
+ *
+ * The input file is expected to contain one edge per line, with long IDs
+ * and double weights, separated by tabs, in the following format:
+ * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
+ *
+ */
+public class SingleSourceShortestPaths implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, Double>() {
+
+					public Double map(Long value) {
+						return Double.MAX_VALUE;
+					}
+		}, env);
+
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
+				.run(new SingleSourceShortestPathsAlgorithm<Long>(srcVertexId, maxIterations))
+				.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+		env.execute("Single Source Shortest Paths Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Single Source Shortest Paths";
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing Single Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.fieldDelimiter("\t")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
deleted file mode 100644
index 1066558..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.library.SingleSourceShortestPaths;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example implements the Single Source Shortest Paths algorithm,
- * using a vertex-centric iteration.
- *
- * The input file is expected to contain one edge per line, with long IDs
- * and double weights, separated by tabs, in the following format:
- * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
- *
- * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
- *
- */
-public class SingleSourceShortestPathsExample implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
-				new MapFunction<Long, Double>() {
-
-					public Double map(Long value) {
-						return Double.MAX_VALUE;
-					}
-		}, env);
-
-		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
-				.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations))
-				.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			singleSourceShortestPaths.print();
-		}
-
-		env.execute("Single Source Shortest Paths Example");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Single Source Shortest Paths";
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static Long srcVertexId = 1l;
-
-	private static String edgesInputPath = null;
-
-	private static String outputPath = null;
-
-	private static int maxIterations = 5;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			srcVertexId = Long.parseLong(args[0]);
-			edgesInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-				System.out.println("Executing Single Source Shortest Paths example "
-						+ "with default parameters and built-in default data.");
-				System.out.println("  Provide parameters to read input data from files.");
-				System.out.println("  See the documentation for the correct format of input files.");
-				System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n")
-					.fieldDelimiter("\t")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
new file mode 100644
index 0000000..629f5ef
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class CommunityDetectionData {
+
+	// the algorithm is not guaranteed to always converge
+	public static final Integer MAX_ITERATIONS = 30;
+
+	public static final double DELTA = 0.5f;
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+		edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+		edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+		edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+		edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+		edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+		edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+		edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+		edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+		edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+		return env.fromCollection(edges);
+	}
+
+	private CommunityDetectionData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
new file mode 100644
index 0000000..b9556a9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the connected components example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class ConnectedComponentsDefaultData {
+
+	public static final Integer MAX_ITERATIONS = 4;
+
+	public static final String EDGES = "1	2\n" + "2	3\n" + "2	4\n" + "3	4";
+
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";
+
+	private ConnectedComponentsDefaultData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsExampleData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsExampleData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsExampleData.java
deleted file mode 100644
index 9e55876..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsExampleData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the connected components example program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class ConnectedComponentsExampleData {
-
-	public static final Integer MAX_ITERATIONS = 4;
-
-	public static final String EDGES = "1	2\n" + "2	3\n" + "2	4\n" + "3	4";
-
-	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
-		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
-
-		return env.fromCollection(edges);
-	}
-
-	public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";
-
-	private ConnectedComponentsExampleData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
deleted file mode 100644
index 3f1c7bb..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class EdgeWithLongIdNullValueParser extends RichMapFunction<String, Edge<Long, NullValue>> {
-	private static final long serialVersionUID = 1L;
-
-	public Edge<Long, NullValue> map(String value) {
-		String[] nums = value.split(" ");
-		return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), 
-				NullValue.getInstance());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
index 0d13f72..80765bf 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.EuclideanGraphExample;
+import org.apache.flink.graph.example.EuclideanGraphWeighing;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,12 +38,12 @@ public class EuclideanGraphData {
 	public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
 			"6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0";
 
-	public static DataSet<Vertex<Long, EuclideanGraphExample.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+	public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
 
-		List<Vertex<Long, EuclideanGraphExample.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphExample.Point>>();
+		List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>();
 		for(int i=1; i<=NUM_VERTICES; i++) {
-			vertices.add(new Vertex<Long, EuclideanGraphExample.Point>(new Long(i),
-					new EuclideanGraphExample.Point(new Double(i), new Double(i))));
+			vertices.add(new Vertex<Long, EuclideanGraphWeighing.Point>(new Long(i),
+					new EuclideanGraphWeighing.Point(new Double(i), new Double(i))));
 		}
 
 		return env.fromCollection(vertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
deleted file mode 100644
index 20b562b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data set used for the Simple Community Detection example program.
- * If no parameters are given to the program, the default edge data set is used.
- */
-public class SimpleCommunityDetectionData {
-
-	// the algorithm is not guaranteed to always converge
-	public static final Integer MAX_ITERATIONS = 30;
-
-	public static final double DELTA = 0.5f;
-
-	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
-		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
-		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
-		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
-		edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
-		edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
-		edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
-		edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
-		edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
-		edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
-		edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
-		edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
-		edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
-		edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
-		edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
-		edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
-		edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
-		edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
-		edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
-
-		return env.fromCollection(edges);
-	}
-
-	private SimpleCommunityDetectionData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index 67cd150..cf0034a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -25,6 +25,10 @@ import org.apache.flink.graph.Edge;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Provides the default data set used for the Single Source Shortest Paths example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
 public class SingleSourceShortestPathsData {
 
 	public static final Long SRC_VERTEX_ID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
new file mode 100644
index 0000000..47e3595
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ */
+public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, Double> {
+
+	private Integer maxIterations;
+
+	private Double delta;
+
+	public CommunityDetectionAlgorithm(Integer maxIterations, Double delta) {
+
+		this.maxIterations = maxIterations;
+		this.delta = delta;
+	}
+
+	@Override
+	public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+
+		Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+
+		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
+				.mapVertices(new AddScoreToVertexValuesMapper());
+
+		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
+				new LabelMessenger(), maxIterations)
+				.mapVertices(new RemoveScoreFromVertexValuesMapper());
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		private Double delta;
+
+		public VertexLabelUpdater(Double delta) {
+			this.delta = delta;
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
+								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+			// we would like these two maps to be ordered
+			Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+			Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+			for (Tuple2<Long, Double> message : inMessages) {
+				// split the message into received label and score
+				Long receivedLabel = message.f0;
+				Double receivedScore = message.f1;
+
+				// if the label was received before
+				if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+					Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+					receivedLabelsWithScores.put(receivedLabel, newScore);
+				} else {
+					// first time we see the label
+					receivedLabelsWithScores.put(receivedLabel, receivedScore);
+				}
+
+				// store the labels with the highest scores
+				if (labelsWithHighestScore.containsKey(receivedLabel)) {
+					Double currentScore = labelsWithHighestScore.get(receivedLabel);
+					if (currentScore < receivedScore) {
+						// record the highest score
+						labelsWithHighestScore.put(receivedLabel, receivedScore);
+					}
+				} else {
+					// first time we see this label
+					labelsWithHighestScore.put(receivedLabel, receivedScore);
+				}
+			}
+
+			if(receivedLabelsWithScores.size() > 0) {
+				// find the label with the highest score from the ones received
+				Double maxScore = -Double.MAX_VALUE;
+				Long maxScoreLabel = labelScore.f0;
+				for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+						maxScore = receivedLabelsWithScores.get(curLabel);
+						maxScoreLabel = curLabel;
+					}
+				}
+
+				// find the highest score of maxScoreLabel
+				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+				// re-score the new label
+				if (maxScoreLabel != labelScore.f0) {
+					highestScore -= delta / getSuperstepNumber();
+				}
+				// else delta = 0
+				// update own label
+				setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+			Tuple2<Long, Double>, Double> {
+
+		@Override
+		public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
+
+			for(Edge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+			}
+
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+
+		@Override
+		public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
+			return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+
+		@Override
+		public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+			return vertex.getValue().f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
deleted file mode 100644
index d6d8f0b..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-
-/**
- * Connected components algorithm.
- *
- * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
- * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
- * provided that the value is less than the current minimum.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- */
-@SuppressWarnings("serial")
-public class ConnectedComponents implements GraphAlgorithm<Long, Long, NullValue>{
-
-	private Integer maxIterations;
-
-	public ConnectedComponents(Integer maxIterations) {
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
-
-		Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
-
-		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runVertexCentricIteration(new CCUpdater(),
-				new CCMessenger(), maxIterations);
-	}
-
-	/**
-	 * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
-	 */
-	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-
-		@Override
-		public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
-			long min = Long.MAX_VALUE;
-
-			for (long msg : messages) {
-				min = Math.min(min, msg);
-			}
-
-			// update vertex value, if new minimum
-			if (min < currentMin) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-
-	/**
-	 * Distributes the minimum ID associated with a given vertex among all the target vertices.
-	 */
-	public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
-
-		@Override
-		public void sendMessages(Long id, Long currentMin) throws Exception {
-			// send current minimum to neighbors
-			sendMessageToAllNeighbors(currentMin);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
new file mode 100644
index 0000000..a2ba2ac
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Connected components algorithm.
+ *
+ * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
+ * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * provided that the value is less than the current minimum.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long, NullValue>{
+
+	private Integer maxIterations;
+
+	public ConnectedComponentsAlgorithm(Integer maxIterations) {
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+
+		Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+
+		// initialize vertex values and run the Vertex Centric Iteration
+		return undirectedGraph.runVertexCentricIteration(new CCUpdater(),
+				new CCMessenger(), maxIterations);
+	}
+
+	/**
+	 * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
+	 */
+	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
+			long min = Long.MAX_VALUE;
+
+			for (long msg : messages) {
+				min = Math.min(min, msg);
+			}
+
+			// update vertex value, if new minimum
+			if (min < currentMin) {
+				setNewVertexValue(min);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum ID associated with a given vertex among all the target vertices.
+	 */
+	public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
+
+		@Override
+		public void sendMessages(Long id, Long currentMin) throws Exception {
+			// send current minimum to neighbors
+			sendMessageToAllNeighbors(currentMin);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
deleted file mode 100644
index d63a4c3..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * An implementation of the label propagation algorithm. The iterative algorithm
- * detects communities by propagating labels. In each iteration, a vertex adopts
- * the label that is most frequent among its neighbors' labels. Labels are
- * represented by Longs and we assume a total ordering among them, in order to
- * break ties. The algorithm converges when no vertex changes its value or the
- * maximum number of iterations have been reached. Note that different
- * initializations might lead to different results.
- * 
- */
-@SuppressWarnings("serial")
-public class LabelPropagation<K> implements GraphAlgorithm<K, Long, NullValue> {
-
-	private final int maxIterations;
-
-	public LabelPropagation(int maxIterations) {
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
-
-		// iteratively adopt the most frequent label among the neighbors
-		// of each vertex
-		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
-				maxIterations);
-	}
-
-	/**
-	 * Function that updates the value of a vertex by adopting the most frequent
-	 * label among its in-neighbors
-	 */
-	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
-
-		public void updateVertex(K vertexKey, Long vertexValue,
-				MessageIterator<Long> inMessages) {
-			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
-
-			long maxFrequency = 1;
-			long mostFrequentLabel = vertexValue;
-
-			// store the labels with their frequencies
-			for (Long msg : inMessages) {
-				if (labelsWithFrequencies.containsKey(msg)) {
-					long currentFreq = labelsWithFrequencies.get(msg);
-					labelsWithFrequencies.put(msg, currentFreq + 1);
-				} else {
-					labelsWithFrequencies.put(msg, 1L);
-				}
-			}
-			// select the most frequent label: if two or more labels have the
-			// same frequency,
-			// the node adopts the label with the highest value
-			for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
-				if (entry.getValue() == maxFrequency) {
-					// check the label value to break ties
-					if (entry.getKey() > mostFrequentLabel) {
-						mostFrequentLabel = entry.getKey();
-					}
-				} else if (entry.getValue() > maxFrequency) {
-					maxFrequency = entry.getValue();
-					mostFrequentLabel = entry.getKey();
-				}
-			}
-
-			// set the new vertex value
-			setNewVertexValue(mostFrequentLabel);
-		}
-	}
-
-	/**
-	 * Sends the vertex label to all out-neighbors
-	 */
-	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
-		public void sendMessages(K vertexKey, Long newLabel) {
-			sendMessageToAllNeighbors(newLabel);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
new file mode 100644
index 0000000..7fc0614
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An implementation of the label propagation algorithm. The iterative algorithm
+ * detects communities by propagating labels. In each iteration, a vertex adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ * 
+ */
+@SuppressWarnings("serial")
+
+public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
+		implements GraphAlgorithm<K, Long, NullValue> {
+
+	private final int maxIterations;
+
+	public LabelPropagationAlgorithm(int maxIterations) {
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+
+		// iteratively adopt the most frequent label among the neighbors
+		// of each vertex
+		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+				maxIterations);
+	}
+
+	/**
+	 * Function that updates the value of a vertex by adopting the most frequent
+	 * label among its in-neighbors
+	 */
+	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
+
+		public void updateVertex(K vertexKey, Long vertexValue,
+				MessageIterator<Long> inMessages) {
+			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
+
+			long maxFrequency = 1;
+			long mostFrequentLabel = vertexValue;
+
+			// store the labels with their frequencies
+			for (Long msg : inMessages) {
+				if (labelsWithFrequencies.containsKey(msg)) {
+					long currentFreq = labelsWithFrequencies.get(msg);
+					labelsWithFrequencies.put(msg, currentFreq + 1);
+				} else {
+					labelsWithFrequencies.put(msg, 1L);
+				}
+			}
+			// select the most frequent label: if two or more labels have the
+			// same frequency,
+			// the node adopts the label with the highest value
+			for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
+				if (entry.getValue() == maxFrequency) {
+					// check the label value to break ties
+					if (entry.getKey() > mostFrequentLabel) {
+						mostFrequentLabel = entry.getKey();
+					}
+				} else if (entry.getValue() > maxFrequency) {
+					maxFrequency = entry.getValue();
+					mostFrequentLabel = entry.getKey();
+				}
+			}
+
+			// set the new vertex value
+			setNewVertexValue(mostFrequentLabel);
+		}
+	}
+
+	/**
+	 * Sends the vertex label to all out-neighbors
+	 */
+	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+		public void sendMessages(K vertexKey, Long newLabel) {
+			sendMessageToAllNeighbors(newLabel);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
deleted file mode 100644
index bb0a1d1..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-public class PageRank<K> implements	GraphAlgorithm<K, Double, Double> {
-
-	private double beta;
-	private int maxIterations;
-
-	public PageRank(double beta, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
-
-		final long numberOfVertices = network.numberOfVertices();
-		return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
-				maxIterations);
-	}
-
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial
-	 * ranks from all incoming messages and then applying the dampening formula.
-	 */
-	@SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
-		private final double beta;
-		private final long numVertices;
-		
-		public VertexRankUpdater(double beta, long numberOfVertices) {
-			this.beta = beta;
-			this.numVertices = numberOfVertices;
-		}
-
-		@Override
-		public void updateVertex(K vertexKey, Double vertexValue,
-				MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to
-	 * the transition probability, which is associated with an edge as the edge
-	 * value.
-	 */
-	@SuppressWarnings("serial")
-	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		private final long numVertices;
-
-		public RankMessenger(long numberOfVertices) {
-			this.numVertices = numberOfVertices;
-		}
-
-		@Override
-		public void sendMessages(K vertexId, Double newRank) {
-			if (getSuperstepNumber() == 1) {
-				// initialize vertex ranks
-				newRank = 1.0 / numVertices;
-			}
-			for (Edge<K, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
new file mode 100644
index 0000000..4803b44
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.io.Serializable;
+
+
+public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implements
+		GraphAlgorithm<K, Double, Double> {
+
+	private double beta;
+	private int maxIterations;
+
+	public PageRankAlgorithm(double beta, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+
+		final long numberOfVertices = network.numberOfVertices();
+		return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+				maxIterations);
+	}
+
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial
+	 * ranks from all incoming messages and then applying the dampening formula.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+		private final double beta;
+		private final long numVertices;
+		
+		public VertexRankUpdater(double beta, long numberOfVertices) {
+			this.beta = beta;
+			this.numVertices = numberOfVertices;
+		}
+
+		@Override
+		public void updateVertex(K vertexKey, Double vertexValue,
+				MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to
+	 * the transition probability, which is associated with an edge as the edge
+	 * value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+		private final long numVertices;
+
+		public RankMessenger(long numberOfVertices) {
+			this.numVertices = numberOfVertices;
+		}
+
+		@Override
+		public void sendMessages(K vertexId, Double newRank) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				newRank = 1.0 / numVertices;
+			}
+			for (Edge<K, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
+			}
+		}
+	}
+}


Mime
View raw message