flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [2/4] flink git commit: [FLINK-2451] [gelly] removed redundant examples; added comments describing which gelly method each example illustrates.
Date Sat, 22 Aug 2015 19:39:33 GMT
[FLINK-2451] [gelly] removed redundant examples; added comments describing which gelly method each example illustrates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970ab35e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970ab35e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970ab35e

Branch: refs/heads/master
Commit: 970ab35ec8f0268ed38234f35fa50706c5ff3071
Parents: 3a83029
Author: vasia <vasia@apache.org>
Authored: Mon Aug 3 14:37:19 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Sat Aug 22 20:46:20 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/CommunityDetection.java | 142 ------------
 .../graph/example/ConnectedComponents.java      |  11 +-
 .../graph/example/EuclideanGraphWeighing.java   |   3 +
 .../graph/example/GSAConnectedComponents.java   | 178 ---------------
 .../apache/flink/graph/example/GSAPageRank.java | 215 -------------------
 .../example/GSASingleSourceShortestPaths.java   |  27 ++-
 .../flink/graph/example/GraphMetrics.java       |   3 +-
 .../flink/graph/example/IncrementalSSSP.java    |   6 +-
 .../graph/example/JaccardSimilarityMeasure.java |  10 +-
 .../flink/graph/example/LabelPropagation.java   | 171 ---------------
 .../flink/graph/example/MusicProfiles.java      |  46 ++--
 .../apache/flink/graph/example/PageRank.java    | 153 -------------
 .../example/SingleSourceShortestPaths.java      | 106 +++++++--
 .../graph/library/GSAConnectedComponents.java   | 178 +++++++++++++++
 .../apache/flink/graph/library/GSAPageRank.java | 215 +++++++++++++++++++
 .../library/GSASingleSourceShortestPaths.java   | 180 ++++++++++++++++
 16 files changed, 721 insertions(+), 923 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
deleted file mode 100644
index e44e5bd..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.graph.library.CommunityDetectionAlgorithm;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm}
- * library method:
- * <ul>
- * 	<li> with the edge data set given as a parameter
- * 	<li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
- * 1-2 with weight 1.0 and 1-3 with weight 2.0.
- *
- * Usage <code>CommunityDetection &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.CommunityDetectionData}
- */
-public class CommunityDetection implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// set up the graph
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-		Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
-				new MapFunction<Long, Long>() {
-
-					public Long map(Long label) {
-						return label;
-					}
-				}, env);
-
-		// the result is in the form of <vertexId, communityId>, where the communityId is the label
-		// which the vertex converged to
-		DataSet<Vertex<Long, Long>> communityVertices =
-				graph.run(new CommunityDetectionAlgorithm(maxIterations, delta)).getVertices();
-
-		// emit result
-		if (fileOutput) {
-			communityVertices.writeAsCsv(outputPath, "\n", ",");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("Executing Community Detection Example");
-		} else {
-			communityVertices.print();
-		}
-
-	}
-
-	@Override
-	public String getDescription() {
-		return "Community Detection";
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static Integer maxIterations = CommunityDetectionData.MAX_ITERATIONS;
-	private static Double delta = CommunityDetectionData.DELTA;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage CommunityDetection <edge path> <output path> " +
-						"<num iterations> <delta>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-			delta = Double.parseDouble(args[3]);
-
-		} else {
-			System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage CommunityDetection <edge path> <output path> " +
-					"<num iterations> <delta>");
-		}
-
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			return CommunityDetectionData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 3443a55..b841ced 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -31,12 +31,11 @@ import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
 import org.apache.flink.types.NullValue;
 
 /**
- * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
- * library method:
- * <ul>
- * 	<li> with the edge data set given as a parameter
- * 	<li> with default data
- * </ul>
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in {@link org.apache.flink.graph.library}. 
+ * 
+ * In particular, this example uses the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * library method to compute the connected components of the input graph.
  *
  * The input file is a plain text file and must be formatted as follows:
  * Edges are represented by tuples of srcVertexId, trgVertexId which are

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
index 7e2c057..b7e3385 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -33,6 +33,9 @@ import org.apache.flink.graph.example.utils.EuclideanGraphData;
 import java.io.Serializable;
 
 /**
+ * This example shows how to use Gelly's {@link Graph#getTriplets()} and
+ * {@link Graph#joinWithEdges(DataSet, MapFunction)} methods.
+ * 
  * Given a directed, unweighted graph, with vertex values representing points in a plan,
  * return a weighted graph where the edge weights are equal to the Euclidean distance between the
  * src and the trg vertex values.

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
deleted file mode 100755
index 9b75c92..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
- */
-public class GSAConnectedComponents implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
-		// Execute the GSA iteration
-		Graph<Long, Long, NullValue> result =
-				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
-						new UpdateComponentId(), maxIterations);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			connectedComponents.writeAsCsv(outputPath, "\n", " ");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("GSA Connected Components");
-		} else {
-			connectedComponents.print();
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices	implements MapFunction<Long, Long> {
-
-		public Long map(Long vertexId) {
-			return vertexId;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Connected Components UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
-		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getNeighborValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
-		public Long sum(Long newValue, Long currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
-		public void apply(Long summedValue, Long origValue) {
-			if (summedValue < origValue) {
-				setResult(summedValue);
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-
-	private static int maxIterations = 16;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if (args.length != 3) {
-				System.err.println("Usage: GSAConnectedComponents <edge path> " +
-						"<result path> <max iterations>");
-				return false;
-			}
-
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-		} else {
-			System.out.println("Executing GSA Connected Components example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		}
-
-		// Generates 3 components of size 2
-		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-			@Override
-			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
-				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
-			}
-		});
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Connected Components";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
deleted file mode 100644
index 45d4555..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and no
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices
- * and random edge weights.
- */
-public class GSAPageRank implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
-
-		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
-			@Override
-			public Double map(Long value) throws Exception {
-				return 1.0;
-			}
-		}, env);
-
-		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
-		// Assign the transition probabilities as the edge weights
-		Graph<Long, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees,
-						new MapFunction<Tuple2<Double, Long>, Double>() {
-
-							@Override
-							public Double map(Tuple2<Double, Long> value) {
-								return value.f0 / value.f1;
-							}
-						});
-
-		long numberOfVertices = networkWithWeights.numberOfVertices();
-
-		// Execute the GSA iteration
-		Graph<Long, Double, Double> result = networkWithWeights
-				.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
-						new UpdateRanks(numberOfVertices), maxIterations);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Double>> pageRanks = result.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			pageRanks.writeAsCsv(outputPath, "\n", "\t");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("GSA Page Ranks");
-		} else {
-			pageRanks.print();
-		}
-
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Page Rank UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
-
-		long numberOfVertices;
-
-		public GatherRanks(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		@Override
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			double neighborRank = neighbor.getNeighborValue();
-
-			if(getSuperstepNumber() == 1) {
-				neighborRank = 1.0 / numberOfVertices;
-			}
-
-			return neighborRank * neighbor.getEdgeValue();
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumRanks extends SumFunction<Double, Double, Double> {
-
-		@Override
-		public Double sum(Double newValue, Double currentValue) {
-			return newValue + currentValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class UpdateRanks extends ApplyFunction<Long, Double, Double> {
-
-		long numberOfVertices;
-
-		public UpdateRanks(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		@Override
-		public void apply(Double rankSum, Double currentValue) {
-			setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum);
-		}
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static final double DAMPENING_FACTOR = 0.85;
-	private static long numPages = 10;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage: GSAPageRank <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-		} else {
-			System.out.println("Executing GSAPageRank example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAPageRank <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-						public Edge<Long, Double> map(Tuple2<Long, Long> input) {
-							return new Edge<Long, Double>(input.f0, input.f1, 1.0);
-						}
-					}).withForwardedFields("f0; f1");
-		}
-
-		return env.generateSequence(1, numPages).flatMap(
-				new FlatMapFunction<Long, Edge<Long, Double>>() {
-					@Override
-					public void flatMap(Long key,
-										Collector<Edge<Long, Double>> out) throws Exception {
-						int numOutEdges = (int) (Math.random() * (numPages / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numPages) + 1;
-							out.collect(new Edge<Long, Double>(key, target, 1.0));
-						}
-					}
-				});
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Page Rank";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index b01aa23..23a3a82 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -33,7 +33,19 @@ import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ * This example shows how to use Gelly's Gather-Sum-Apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
  */
 public class GSASingleSourceShortestPaths implements ProgramDescription {
 
@@ -54,9 +66,8 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
 		// Execute the GSA iteration
-		Graph<Long, Double, Double> result = graph
-				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
-						new UpdateDistance(), maxIterations);
+		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
+				new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -73,6 +84,10 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
 	@SuppressWarnings("serial")
 	private static final class InitVertices implements MapFunction<Long, Double>{
 
@@ -92,10 +107,6 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Single Source Shortest Path UDFs
-	// --------------------------------------------------------------------------------------------
-
 	@SuppressWarnings("serial")
 	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index c6a776d..6c4d0c2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -30,8 +30,9 @@ import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
 
 /**
+ * This example illustrate how to use Gelly metrics methods and get simple statistics
+ * from the input graph.  
  * 
- * A simple example to illustrate the basic functionality of the graph-api.
  * The program creates a random graph and computes and prints
  * the following metrics:
  * - number of vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index 8d94cbc..cc672b2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -35,11 +35,11 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
- * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
- * upon edge removal.
- *
  * This example illustrates the usage of vertex-centric iteration's
  * messaging direction configuration option.
+ * 
+ * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
+ * upon edge removal.
  *
  * The program takes as input the resulted graph after a SSSP computation,
  * an edge to be removed and the initial graph(i.e. before SSSP was computed).

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index 79de407..0f84dbb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -34,6 +34,13 @@ import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
 import java.util.HashSet;
 
 /**
+ * This example shows how to use
+ * <ul>
+ *  <li> neighborhood methods
+ *  <li> join with vertices
+ *  <li> triplets
+ * </ul>
+ * 
  * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
  * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
  * of the union of neighbor sets - for the src and target vertices.
@@ -117,8 +124,7 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 	private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> {
 
 		@Override
-		public HashSet<Long> reduceNeighbors(HashSet<Long> first,
-											HashSet<Long> second) {
+		public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) {
 			first.addAll(second);
 			return new HashSet<Long>(first);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
deleted file mode 100644
index bee5af3..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.LabelPropagationAlgorithm;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses the label propagation algorithm to detect communities by
- * propagating labels. Initially, each vertex is assigned its id as its label.
- * The vertices iteratively propagate their labels to their neighbors and adopt
- * the most frequent label among their neighbors. The algorithm converges when
- * no vertex changes value or the maximum number of iterations have been
- * reached.
- *
- * The edges input file is expected to contain one edge per line, with long IDs
- * in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * The vertices input file is expected to contain one vertex per line, with long IDs
- * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
- *
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- */
-public class LabelPropagation implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// Set up the execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// Set up the graph
-		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
-		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges,	env);
-
-		// Set up the program
-		DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
-				new LabelPropagationAlgorithm<Long>(maxIterations)).getVertices();
-
-		// Emit results
-		if(fileOutput) {
-			verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
-
-			// Execute the program
-			env.execute("Label Propagation Example");
-		} else {
-			verticesWithCommunity.print();
-		}
-
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static long numVertices = 100;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-			System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(vertexInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new Tuple2ToVertexMap<Long, Long>());
-		}
-
-		return env.generateSequence(1, numVertices).map(
-				new MapFunction<Long, Vertex<Long, Long>>() {
-					public Vertex<Long, Long> map(Long l) throws Exception {
-						return new Vertex<Long, Long>(l, l);
-					}
-				});
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		}
-
-		return env.generateSequence(1, numVertices).flatMap(
-				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-					@Override
-					public void flatMap(Long key,
-							Collector<Edge<Long, NullValue>> out) {
-						int numOutEdges = (int) (Math.random() * (numVertices / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numVertices) + 1;
-							out.collect(new Edge<Long, NullValue>(key, target,
-									NullValue.getInstance()));
-						}
-					}
-				});
-	}
-
-	@Override
-	public String getDescription() {
-		return "Label Propagation Example";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 0fc45bd..7643976 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -42,32 +42,32 @@ import org.apache.flink.graph.library.LabelPropagationAlgorithm;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
+/**
+ * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
+ * The input is a set <userId - songId - playCount> triplets and
+ * a set of bad records, i.e. song ids that should not be trusted.
+ * Initially, we use the DataSet API to filter out the bad records.
+ * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
+ * the top song (most listened) per user.
+ * Then, we use the DataSet API again, to create a user-user similarity graph,
+ * based on common songs, where users that are listeners of the same song
+ * are connected. A user-defined threshold on the playcount value
+ * defines when a user is considered to be a listener of a song.
+ * Finally, we use the graph API to run the label propagation community detection algorithm on
+ * the similarity graph.
+ *
+ * The triplets input is expected to be given as one triplet per line,
+ * in the following format: "<userID>\t<songID>\t<playcount>".
+ *
+ * The mismatches input file is expected to contain one mismatch record per line,
+ * in the following format:
+ * "ERROR: <songID trackID> song_title"
+ *
+ * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
+ */
 @SuppressWarnings("serial")
 public class MusicProfiles implements ProgramDescription {
 
-	/**
-	 * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
-	 * The input is a set <userId - songId - playCount> triplets and
-	 * a set of bad records, i.e. song ids that should not be trusted.
-	 * Initially, we use the DataSet API to filter out the bad records.
-	 * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
-	 * the top song (most listened) per user.
-	 * Then, we use the DataSet API again, to create a user-user similarity graph,
-	 * based on common songs, where users that are listeners of the same song
-	 * are connected. A user-defined threshold on the playcount value
-	 * defines when a user is considered to be a listener of a song.
-	 * Finally, we use the graph API to run the label propagation community detection algorithm on
-	 * the similarity graph.
-	 *
-	 * The triplets input is expected to be given as one triplet per line,
-	 * in the following format: "<userID>\t<songID>\t<playcount>".
-	 *
-	 * The mismatches input file is expected to contain one mismatch record per line,
-	 * in the following format:
-	 * "ERROR: <songID trackID> song_title"
-	 *
-	 * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
-	 */
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
deleted file mode 100644
index 10b4be4..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.PageRankAlgorithm;
-import org.apache.flink.util.Collector;
-
-/**
- * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and no
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices.
- *
- */
-public class PageRank implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
-
-		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
-			public Double map(Long value) throws Exception {
-				return 1.0;
-			}
-		}, env);
-
-		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
-		// assign the transition probabilities as the edge weights
-		Graph<Long, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees,
-						new MapFunction<Tuple2<Double, Long>, Double>() {
-							public Double map(Tuple2<Double, Long> value) {
-								return value.f0 / value.f1;
-							}
-						});
-
-		DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run(
-				new PageRankAlgorithm<Long>(DAMPENING_FACTOR, maxIterations))
-				.getVertices();
-
-		if (fileOutput) {
-			pageRanks.writeAsCsv(outputPath, "\n", "\t");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute();
-		} else {
-			pageRanks.print();
-		}
-
-	}
-
-	@Override
-	public String getDescription() {
-		return "PageRank example";
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static final double DAMPENING_FACTOR = 0.85;
-	private static long numPages = 10;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage: PageRank <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-		} else {
-			System.out.println("Executing PageRank example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: PageRank <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-						public Edge<Long, Double> map(Tuple2<Long, Long> input) {
-							return new Edge<Long, Double>(input.f0, input.f1, 1.0);
-						}
-					}).withForwardedFields("f0; f1");
-		}
-
-		return env.generateSequence(1, numPages).flatMap(
-				new FlatMapFunction<Long, Edge<Long, Double>>() {
-					@Override
-					public void flatMap(Long key,
-							Collector<Edge<Long, Double>> out) throws Exception {
-						int numOutEdges = (int) (Math.random() * (numPages / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numPages) + 1;
-							out.collect(new Edge<Long, Double>(key, target, 1.0));
-						}
-					}
-				});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
index 9d7d2c2..391ebaf 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -26,23 +26,28 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.library.SingleSourceShortestPathsAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
- * This example implements the Single Source Shortest Paths algorithm,
- * using a vertex-centric iteration.
+ * This example shows how to use Gelly's vertex-centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}. 
  *
- * The input file is expected to contain one edge per line, with long IDs
- * and double weights, separated by tabs, in the following format:
- * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
- *
- * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
  *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
  */
 public class SingleSourceShortestPaths implements ProgramDescription {
 
-	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -53,17 +58,14 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 
 		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
-				new MapFunction<Long, Double>() {
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
-					public Double map(Long value) {
-						return Double.MAX_VALUE;
-					}
-		}, env);
+		// Execute the vertex-centric iteration
+		Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+				new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
 
-		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
-				.run(new SingleSourceShortestPathsAlgorithm<Long>(srcVertexId, maxIterations))
-				.getVertices();
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
 
 		// emit result
 		if (fileOutput) {
@@ -77,9 +79,66 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 
 	}
 
-	@Override
-	public String getDescription() {
-		return "Single Source Shortest Paths";
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
+
+		private long srcId;
+
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
+		}
+
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
+			}
+			else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	/**
+	 * Function that updates the value of a vertex by picking the minimum
+	 * distance from all incoming messages.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
+
+			Double minDistance = Double.MAX_VALUE;
+
+			for (double msg : inMessages) {
+				if (msg < minDistance) {
+					minDistance = msg;
+				}
+			}
+
+			if (vertex.getValue() > minDistance) {
+				setNewVertexValue(minDistance);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Double> vertex) {
+			for (Edge<Long, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+			}
+		}
 	}
 
 	// ******************************************************************************************************************
@@ -132,4 +191,9 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}
 	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex-centric Single Source Shortest Paths";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
new file mode 100755
index 0000000..9b75c92
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponents implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+		// Execute the GSA iteration
+		Graph<Long, Long, NullValue> result =
+				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			connectedComponents.writeAsCsv(outputPath, "\n", " ");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("GSA Connected Components");
+		} else {
+			connectedComponents.print();
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
+
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 16;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if (args.length != 3) {
+				System.err.println("Usage: GSAConnectedComponents <edge path> " +
+						"<result path> <max iterations>");
+				return false;
+			}
+
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+		} else {
+			System.out.println("Executing GSA Connected Components example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		}
+
+		// Generates 3 components of size 2
+		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+			@Override
+			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+			}
+		});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Connected Components";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
new file mode 100644
index 0000000..45d4555
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs and no
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 10 vertices
+ * and random edge weights.
+ */
+public class GSAPageRank implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+
+		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
+
+			@Override
+			public Double map(Long value) throws Exception {
+				return 1.0;
+			}
+		}, env);
+
+		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+
+		// Assign the transition probabilities as the edge weights
+		Graph<Long, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees,
+						new MapFunction<Tuple2<Double, Long>, Double>() {
+
+							@Override
+							public Double map(Tuple2<Double, Long> value) {
+								return value.f0 / value.f1;
+							}
+						});
+
+		long numberOfVertices = networkWithWeights.numberOfVertices();
+
+		// Execute the GSA iteration
+		Graph<Long, Double, Double> result = networkWithWeights
+				.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+						new UpdateRanks(numberOfVertices), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> pageRanks = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			pageRanks.writeAsCsv(outputPath, "\n", "\t");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("GSA Page Ranks");
+		} else {
+			pageRanks.print();
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Page Rank UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
+
+		long numberOfVertices;
+
+		public GatherRanks(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
+		}
+
+		@Override
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			double neighborRank = neighbor.getNeighborValue();
+
+			if(getSuperstepNumber() == 1) {
+				neighborRank = 1.0 / numberOfVertices;
+			}
+
+			return neighborRank * neighbor.getEdgeValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumRanks extends SumFunction<Double, Double, Double> {
+
+		@Override
+		public Double sum(Double newValue, Double currentValue) {
+			return newValue + currentValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class UpdateRanks extends ApplyFunction<Long, Double, Double> {
+
+		long numberOfVertices;
+
+		public UpdateRanks(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
+		}
+
+		@Override
+		public void apply(Double rankSum, Double currentValue) {
+			setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum);
+		}
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static final double DAMPENING_FACTOR = 0.85;
+	private static long numPages = 10;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 3) {
+				System.err.println("Usage: GSAPageRank <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+		} else {
+			System.out.println("Executing GSAPageRank example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAPageRank <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						public Edge<Long, Double> map(Tuple2<Long, Long> input) {
+							return new Edge<Long, Double>(input.f0, input.f1, 1.0);
+						}
+					}).withForwardedFields("f0; f1");
+		}
+
+		return env.generateSequence(1, numPages).flatMap(
+				new FlatMapFunction<Long, Edge<Long, Double>>() {
+					@Override
+					public void flatMap(Long key,
+										Collector<Edge<Long, Double>> out) throws Exception {
+						int numOutEdges = (int) (Math.random() * (numPages / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numPages) + 1;
+							out.collect(new Edge<Long, Double>(key, target, 1.0));
+						}
+					}
+				});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Page Rank";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..b01aa23
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+		// Execute the GSA iteration
+		Graph<Long, Double, Double> result = graph
+				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+						new UpdateDistance(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if(fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("GSA Single Source Shortest Paths");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
+
+		private long srcId;
+
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
+		}
+
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
+			}
+			else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+		public Double sum(Double newValue, Double currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing GSASingle Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Single Source Shortest Paths";
+	}
+}


Mime
View raw message