flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [3/4] flink git commit: [FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather implementation
Date Thu, 30 Jun 2016 18:57:51 GMT
[FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather implementation

Rename MessageFunction to ScatterFunction
and VertexUpdateFunction to GatherFunction.

Change the parameter order in
  Graph.runScatterGatherIteration(VertexUpdateFunction, MessagingFunction)
to
  Graph.runScatterGatherIteration(ScatterFunction, GatherFunction)

This closes #2184


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/918e5d0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/918e5d0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/918e5d0c

Branch: refs/heads/master
Commit: 918e5d0c9dae7a080dcf2505df38af4c655ba6b6
Parents: 6c6b17b
Author: Greg Hogan <code@greghogan.com>
Authored: Fri May 6 15:39:38 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Thu Jun 30 13:04:32 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  78 ++--
 .../flink/graph/examples/HITSAlgorithm.java     | 112 +++--
 .../flink/graph/examples/IncrementalSSSP.java   |  40 +-
 .../examples/SingleSourceShortestPaths.java     |  42 +-
 .../examples/SingleSourceShortestPaths.scala    |  47 ++-
 .../test/examples/IncrementalSSSPITCase.java    |   2 +-
 .../org/apache/flink/graph/scala/Graph.scala    | 318 +++++++--------
 .../main/java/org/apache/flink/graph/Graph.java |  30 +-
 .../flink/graph/IterationConfiguration.java     |   5 +-
 .../flink/graph/library/CommunityDetection.java |  46 +--
 .../graph/library/ConnectedComponents.java      |  54 +--
 .../graph/library/GSAConnectedComponents.java   |   2 +-
 .../library/GSASingleSourceShortestPaths.java   |   2 +-
 .../flink/graph/library/LabelPropagation.java   |  54 +--
 .../apache/flink/graph/library/PageRank.java    |  53 ++-
 .../library/SingleSourceShortestPaths.java      |  44 +-
 .../flink/graph/library/TriangleEnumerator.java |   6 +-
 .../flink/graph/spargel/GatherFunction.java     | 251 ++++++++++++
 .../flink/graph/spargel/MessagingFunction.java  | 338 ---------------
 .../flink/graph/spargel/ScatterFunction.java    | 338 +++++++++++++++
 .../spargel/ScatterGatherConfiguration.java     |  44 +-
 .../graph/spargel/ScatterGatherIteration.java   | 407 +++++++++----------
 .../graph/spargel/VertexUpdateFunction.java     | 251 ------------
 .../graph/spargel/SpargelCompilerTest.java      |  12 +-
 .../graph/spargel/SpargelTranslationTest.java   | 119 +++---
 .../test/CollectionModeSuperstepITCase.java     |  35 +-
 .../test/ScatterGatherConfigurationITCase.java  | 227 +++++------
 27 files changed, 1460 insertions(+), 1497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index aa232fc..f063f09 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1083,10 +1083,10 @@ final class Compute extends ComputeFunction {
 ### Scatter-Gather Iterations
 The scatter-gather model, also known as "signal/collect" model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use scatter-gather iterations in Gelly, the user only needs to define how a vertex behaves in each superstep:
 
-* <strong>Messaging</strong>:  corresponds to the scatter phase and produces the messages that a vertex will send to other vertices.
-* <strong>Value Update</strong>: corresponds to the gather phase and updates the vertex value using the received messages.
+* <strong>Scatter</strong>:  produces the messages that a vertex will send to other vertices.
+* <strong>Gather</strong>: updates the vertex value using received messages.
 
-Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a `MessagingFunction`, which allows a vertex to send out messages for other vertices. Messages are recieved during the same superstep as they are sent. The second function is `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages.
+Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a `ScatterFunction`, which allows a vertex to send out messages to other vertices. Messages are received during the same superstep as they are sent. The second function is `GatherFunction`, which defines how a vertex will update its value based on the received messages.
 These functions and the maximum number of iterations to run are given as parameters to Gelly's `runScatterGatherIteration`. This method will execute the scatter-gather iteration on the input Graph and return a new Graph, with updated vertex values.
 
 A scatter-gather iteration can be extended with information such as the total number of vertices, the in degree and out degree.
@@ -1109,7 +1109,7 @@ int maxIterations = 10;
 
 // Execute the scatter-gather iteration
 Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
-			new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+			new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations);
 
 // Extract the vertices as the result
 DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -1118,7 +1118,7 @@ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
 // - - -  UDFs - - - //
 
 // scatter: messaging
-public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
+public static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> {
 
 	public void sendMessages(Vertex<Long, Double> vertex) {
 		for (Edge<Long, Double> edge : getEdges()) {
@@ -1128,7 +1128,7 @@ public static final class MinDistanceMessenger extends MessagingFunction<Long, D
 }
 
 // gather: vertex update
-public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
 
 	public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
 		Double minDistance = Double.MAX_VALUE;
@@ -1157,7 +1157,7 @@ val graph: Graph[Long, Double, Double] = ...
 val maxIterations = 10
 
 // Execute the scatter-gather iteration
-val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, new MinDistanceMessenger, maxIterations)
+val result = graph.runScatterGatherIteration(new MinDistanceMessenger, new VertexDistanceUpdater, maxIterations)
 
 // Extract the vertices as the result
 val singleSourceShortestPaths = result.getVertices
@@ -1166,7 +1166,7 @@ val singleSourceShortestPaths = result.getVertices
 // - - -  UDFs - - - //
 
 // messaging
-final class MinDistanceMessenger extends MessagingFunction[Long, Double, Double, Double] {
+final class MinDistanceMessenger extends ScatterFunction[Long, Double, Double, Double] {
 
 	override def sendMessages(vertex: Vertex[Long, Double]) = {
 		for (edge: Edge[Long, Double] <- getEdges) {
@@ -1176,7 +1176,7 @@ final class MinDistanceMessenger extends MessagingFunction[Long, Double, Double,
 }
 
 // vertex update
-final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] {
 
 	override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) = {
 		var minDistance = Double.MaxValue
@@ -1211,9 +1211,9 @@ and can be specified using the `setName()` method.
 * <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
 
 * <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
-all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.
+all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `ScatterFunction` and `GatherFunction`.
 
-* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/batch/index.html#broadcast-variables) to the `ScatterFunction` and `GatherFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.
 
 * <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
 The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1.
@@ -1245,10 +1245,12 @@ parameters.registerAggregator("sumAggregator", new LongSumAggregator());
 // run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
 			graph.runScatterGatherIteration(
-			new VertexUpdater(), new Messenger(), maxIterations, parameters);
+			new Messenger(), new VertexUpdater(), maxIterations, parameters);
 
 // user-defined functions
-public static final class VertexUpdater extends VertexUpdateFunction {
+public static final class Messenger extends ScatterFunction {...}
+
+public static final class VertexUpdater extends GatherFunction {
 
 	LongSumAggregator aggregator = new LongSumAggregator();
 
@@ -1272,8 +1274,6 @@ public static final class VertexUpdater extends VertexUpdateFunction {
 	}
 }
 
-public static final class Messenger extends MessagingFunction {...}
-
 {% endhighlight %}
 </div>
 
@@ -1294,10 +1294,12 @@ parameters.setParallelism(16)
 parameters.registerAggregator("sumAggregator", new LongSumAggregator)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
 
 // user-defined functions
-final class VertexUpdater extends VertexUpdateFunction {
+final class Messenger extends ScatterFunction {...}
+
+final class VertexUpdater extends GatherFunction {
 
 	var aggregator = new LongSumAggregator
 
@@ -1321,8 +1323,6 @@ final class VertexUpdater extends VertexUpdateFunction {
 	}
 }
 
-final class Messenger extends MessagingFunction {...}
-
 {% endhighlight %}
 </div>
 </div>
@@ -1347,20 +1347,20 @@ parameters.setOptDegrees(true);
 // run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
 			graph.runScatterGatherIteration(
-			new VertexUpdater(), new Messenger(), maxIterations, parameters);
+			new Messenger(), new VertexUpdater(), maxIterations, parameters);
 
 // user-defined functions
-public static final class VertexUpdater {
+public static final class Messenger extends ScatterFunction {
 	...
-	// get the number of vertices
-	long numVertices = getNumberOfVertices();
+	// retrieve the vertex out-degree
+	outDegree = getOutDegree();
 	...
 }
 
-public static final class Messenger {
+public static final class VertexUpdater extends GatherFunction {
 	...
-	// retrieve the vertex out-degree
-	outDegree = getOutDegree();
+	// get the number of vertices
+	long numVertices = getNumberOfVertices();
 	...
 }
 
@@ -1382,20 +1382,20 @@ parameters.setOptNumVertices(true)
 parameters.setOptDegrees(true)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
 
 // user-defined functions
-final class VertexUpdater {
+final class Messenger extends ScatterFunction {
 	...
-	// get the number of vertices
-	val numVertices = getNumberOfVertices
+	// retrieve the vertex out-degree
+	val outDegree = getOutDegree
 	...
 }
 
-final class Messenger {
+final class VertexUpdater extends GatherFunction {
 	...
-	// retrieve the vertex out-degree
-	val outDegree = getOutDegree
+	// get the number of vertices
+	val numVertices = getNumberOfVertices
 	...
 }
 
@@ -1419,13 +1419,13 @@ parameters.setDirection(EdgeDirection.IN);
 // run the scatter-gather iteration, also passing the configuration parameters
 DataSet<Vertex<Long, HashSet<Long>>> result =
 			graph.runScatterGatherIteration(
-			new VertexUpdater(), new Messenger(), maxIterations, parameters)
+			new Messenger(), new VertexUpdater(), maxIterations, parameters)
 			.getVertices();
 
 // user-defined functions
-public static final class VertexUpdater {...}
+public static final class Messenger extends GatherFunction {...}
 
-public static final class Messenger {...}
+public static final class VertexUpdater extends ScatterFunction {...}
 
 {% endhighlight %}
 </div>
@@ -1441,13 +1441,13 @@ val parameters = new ScatterGatherConfiguration
 parameters.setDirection(EdgeDirection.IN)
 
 // run the scatter-gather iteration, also passing the configuration parameters
-val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
 			.getVertices
 
 // user-defined functions
-final class VertexUpdater {...}
+final class Messenger extends ScatterFunction {...}
 
-final class Messenger {...}
+final class VertexUpdater extends GatherFunction {...}
 
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
index 129d2a6..ff5a2e9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java
@@ -21,17 +21,16 @@ package org.apache.flink.graph.examples;
 import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.util.Preconditions;
 
@@ -100,17 +99,55 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 		parameter.registerAggregator("diffValueSum", new DoubleSumAggregator());
 
 		return newGraph
-				.runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold),
-						new MessageUpdate<K>(maxIterations), maxIterations, parameter)
+				.runScatterGatherIteration(new MessageUpdate<K>(maxIterations),
+						new VertexUpdate<K>(maxIterations, convergeThreshold), maxIterations, parameter)
 				.getVertices();
 	}
 
 	/**
+	 * Distributes the value of a vertex among all neighbor vertices and sum all the
+	 * value in every superstep.
+	 */
+	private static final class MessageUpdate<K> extends ScatterFunction<K, Tuple2<DoubleValue, DoubleValue>, Double, Boolean> {
+		private int maxIteration;
+
+		public MessageUpdate(int maxIteration) {
+			this.maxIteration = maxIteration;
+		}
+
+		@Override
+		public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
+			// in the first iteration, no aggregation to call, init sum with value of vertex
+			double iterationValueSum = 1.0;
+
+			if (getSuperstepNumber() > 1) {
+				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
+			}
+			for (Edge<K, Boolean> edge : getEdges()) {
+				if (getSuperstepNumber() != maxIteration) {
+					if (getSuperstepNumber() % 2 == 1) {
+						if (edge.getValue()) {
+							sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum);
+						}
+					} else {
+						if (!edge.getValue()) {
+							sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum);
+						}
+					}
+				} else {
+					if (!edge.getValue()) {
+						sendMessageTo(edge.getTarget(), iterationValueSum);
+					}
+				}
+			}
+		}
+	}
+
+	/**
 	 * Function that updates the value of a vertex by summing up the partial
 	 * values from all messages and normalize the value.
 	 */
-	@SuppressWarnings("serial")
-	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> {
+	private static final class VertexUpdate<K> extends GatherFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> {
 		private int maxIteration;
 		private double convergeThreshold;
 		private DoubleSumAggregator updatedValueSumAggregator;
@@ -162,7 +199,7 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 						diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue();
 					}
 					authoritySumAggregator.aggregate(previousAuthAverage);
-					
+
 					if (diffValueSum > convergeThreshold) {
 						newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
 						newAuthorityValue.setValue(updateValue);
@@ -191,77 +228,24 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 		}
 	}
 
-	/**
-	 * Distributes the value of a vertex among all neighbor vertices and sum all the
-	 * value in every superstep.
-	 */
-	@SuppressWarnings("serial")
-	public static final class MessageUpdate<K> extends MessagingFunction<K, Tuple2<DoubleValue, DoubleValue>, Double, Boolean> {
-		private int maxIteration;
-
-		public MessageUpdate(int maxIteration) {
-			this.maxIteration = maxIteration;
-		}
-
-		@Override
-		public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
-
-			// in the first iteration, no aggregation to call, init sum with value of vertex
-			double iterationValueSum = 1.0;
-
-			if (getSuperstepNumber() > 1) {
-				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
-			}
-			for (Edge<K, Boolean> edge : getEdges()) {
-				if (getSuperstepNumber() != maxIteration) {
-					if (getSuperstepNumber() % 2 == 1) {
-						if (edge.getValue()) {
-							sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum);
-						}
-					} else {
-						if (!edge.getValue()) {
-							sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum);
-						}
-					}
-				} else {
-					if (!edge.getValue()) {
-						sendMessageTo(edge.getTarget(), iterationValueSum);
-					}
-				}
-			}
-		}
-	}
-
-	public static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> {
-
-		private static final long serialVersionUID = 1L;
-
+	private static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<DoubleValue, DoubleValue>> {
 		private Tuple2<DoubleValue, DoubleValue> initVertexValue = new Tuple2<>(new DoubleValue(1.0), new DoubleValue(1.0));
 
 		public Tuple2<DoubleValue, DoubleValue> map(Vertex<K, VV> value) {
-
 			//init hub and authority value of each vertex
 			return initVertexValue;
 		}
 	}
 
-	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> {
-
-		private static final long serialVersionUID = 1L;
-
+	private static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> {
 		public Boolean map(Edge<K, EV> edge) {
-			
 			// mark edge as true for authority updating
 			return true;
 		}
 	}
 
-	public static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> {
-
-		private static final long serialVersionUID = 1L;
-
+	private static class HubEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, Boolean> {
 		public Boolean map(Edge<K, EV> edge) {
-			
 			// mark edge as false for hub updating
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
index 26e419f..631384c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/IncrementalSSSP.java
@@ -27,10 +27,10 @@ import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.examples.data.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
  * This example illustrates how to 
@@ -97,8 +97,8 @@ public class IncrementalSSSP implements ProgramDescription {
 			parameters.setOptDegrees(true);
 
 			// run the scatter-gather iteration to propagate info
-			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
-					new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new InvalidateMessenger(edgeToBeRemoved),
+					new VertexDistanceUpdater(), maxIterations, parameters);
 
 			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
 
@@ -147,22 +147,7 @@ public class IncrementalSSSP implements ProgramDescription {
 		}).count() > 0;
 	}
 
-	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
-
-		@Override
-		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
-			if (inMessages.hasNext()) {
-				Long outDegree = getOutDegree() - 1;
-				// check if the vertex has another SP-Edge
-				if (outDegree <= 0) {
-					// set own value to infinity
-					setNewVertexValue(Double.MAX_VALUE);
-				}
-			}
-		}
-	}
-
-	public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
+	public static final class InvalidateMessenger extends ScatterFunction<Long, Double, Double, Double> {
 
 		private Edge<Long, Double> edgeToBeRemoved;
 
@@ -190,6 +175,21 @@ public class IncrementalSSSP implements ProgramDescription {
 		}
 	}
 
+	public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
+			if (inMessages.hasNext()) {
+				Long outDegree = getOutDegree() - 1;
+				// check if the vertex has another SP-Edge
+				if (outDegree <= 0) {
+					// set own value to infinity
+					setNewVertexValue(Double.MAX_VALUE);
+				}
+			}
+		}
+	}
+
 	// ******************************************************************************************************************
 	// UTIL METHODS
 	// ******************************************************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
index c9abf02..68d20e0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
@@ -62,7 +62,7 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 
 		// Execute the scatter-gather iteration
 		Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
-				new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+				new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -103,11 +103,28 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 	}
 
 	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 */
+	@SuppressWarnings("serial")
+	private static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Double> vertex) {
+			if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+				for (Edge<Long, Double> edge : getEdges()) {
+					sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+				}
+			}
+		}
+	}
+
+	/**
 	 * Function that updates the value of a vertex by picking the minimum
 	 * distance from all incoming messages.
 	 */
 	@SuppressWarnings("serial")
-	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+	private static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
 
 		@Override
 		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
@@ -126,23 +143,6 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 		}
 	}
 
-	/**
-	 * Distributes the minimum distance associated with a given vertex among all
-	 * the target vertices summed up with the edge's value.
-	 */
-	@SuppressWarnings("serial")
-	public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
-
-		@Override
-		public void sendMessages(Vertex<Long, Double> vertex) {
-			if (vertex.getValue() < Double.POSITIVE_INFINITY) {
-				for (Edge<Long, Double> edge : getEdges()) {
-					sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
-				}
-			}
-		}
-	}
-
 	// ******************************************************************************************************************
 	// UTIL METHODS
 	// ******************************************************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
index 4f84bb0..2d623e7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
@@ -22,11 +22,10 @@ import org.apache.flink.api.scala._
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.Edge
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.spargel.{MessageIterator, ScatterFunction, GatherFunction}
 import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+
 import scala.collection.JavaConversions._
 import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
 
@@ -55,8 +54,8 @@ object SingleSourceShortestPaths {
     val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
 
     // Execute the scatter-gather iteration
-    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
-      new MinDistanceMessenger, maxIterations)
+    val result = graph.runScatterGatherIteration(new MinDistanceMessenger,
+      new VertexDistanceUpdater, maxIterations)
 
     // Extract the vertices as the result
     val singleSourceShortestPaths = result.getVertices
@@ -86,10 +85,26 @@ object SingleSourceShortestPaths {
   }
 
   /**
-   * Function that updates the value of a vertex by picking the minimum
-   * distance from all incoming messages.
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
    */
-  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+  private final class MinDistanceMessenger extends
+    ScatterFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      if (vertex.getValue < Double.PositiveInfinity) {
+        for (edge: Edge[Long, Double] <- getEdges) {
+          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+        }
+      }
+    }
+  }
+
+  /**
+    * Function that updates the value of a vertex by picking the minimum
+    * distance from all incoming messages.
+    */
+  private final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] {
 
     override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
       var minDistance = Double.MaxValue
@@ -105,22 +120,6 @@ object SingleSourceShortestPaths {
     }
   }
 
-  /**
-   * Distributes the minimum distance associated with a given vertex among all
-   * the target vertices summed up with the edge's value.
-   */
-  private final class MinDistanceMessenger extends
-    MessagingFunction[Long, Double, Double, Double] {
-
-    override def sendMessages(vertex: Vertex[Long, Double]) {
-      if (vertex.getValue < Double.PositiveInfinity) {
-        for (edge: Edge[Long, Double] <- getEdges) {
-          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
-        }
-      }
-    }
-  }
-
   // ****************************************************************************
   // UTIL METHODS
   // ****************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
index d27dcd8..24b8cf1 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -111,8 +111,8 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
 
 			// run the scatter gather iteration to propagate info
 			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
-					new IncrementalSSSP.VertexDistanceUpdater(),
 					new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+					new IncrementalSSSP.VertexDistanceUpdater(),
 					IncrementalSSSPData.NUM_VERTICES, parameters);
 
 			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index f7e13ba..4dd9d12 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -24,9 +24,9 @@ import org.apache.flink.api.java.{tuple => jtuple}
 import org.apache.flink.api.scala._
 import org.apache.flink.graph._
 import org.apache.flink.graph.asm.translate.TranslateFunction
-import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
+import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, SumFunction, GatherFunction => GSAGatherFunction}
 import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration}
-import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
+import org.apache.flink.graph.spargel.{ScatterFunction, ScatterGatherConfiguration, GatherFunction => SpargelGatherFunction}
 import org.apache.flink.graph.validation.GraphValidator
 import org.apache.flink.types.{LongValue, NullValue}
 import org.apache.flink.util.Preconditions
@@ -38,8 +38,8 @@ import _root_.scala.reflect.ClassTag
 object Graph {
 
   /**
-  * Creates a Graph from a DataSet of vertices and a DataSet of edges.
-  */
+   * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -47,19 +47,19 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a DataSet of edges.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
   (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
     wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
   }
 
   /**
-  * Creates a graph from a DataSet of edges.
-  * Vertices are created automatically and their values are set by applying the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a graph from a DataSet of edges.
+   * Vertices are created automatically and their values are set by applying the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]],
   vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -68,8 +68,8 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a Seq of vertices and a Seq of edges.
-  */
+   * Creates a Graph from a Seq of vertices and a Seq of edges.
+   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
   ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -78,19 +78,19 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a Seq of edges.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a Seq of edges.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
   (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
     wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
   }
 
   /**
-  * Creates a graph from a Seq of edges.
-  * Vertices are created automatically and their values are set by applying the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a graph from a Seq of edges.
+   * Vertices are created automatically and their values are set by applying the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV],
   env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -105,7 +105,7 @@ object Graph {
    * The first field of the Tuple3 object for edges will become the source ID,
    * the second field will become the target ID, and the third field will become
    * the edge value. 
-  */
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -116,12 +116,12 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * The first field of the Tuple3 object for edges will become the source ID,
-  * the second field will become the target ID, and the third field will become
-  * the edge value. 
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+   * Creates a Graph from a DataSet of Tuples representing the edges.
+   * The first field of the Tuple3 object for edges will become the source ID,
+   * the second field will become the target ID, and the third field will become
+   * the edge value.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
   (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
@@ -129,13 +129,13 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuples representing the edges.
-  * The first field of the Tuple3 object for edges will become the source ID,
-  * the second field will become the target ID, and the third field will become
-  * the edge value. 
-  * Vertices are created automatically and their values are set by applying the provided
-  * vertexValueInitializer map function to the vertex ids.
-  */
+   * Creates a Graph from a DataSet of Tuples representing the edges.
+   * The first field of the Tuple3 object for edges will become the source ID,
+   * the second field will become the target ID, and the third field will become
+   * the edge value.
+   * Vertices are created automatically and their values are set by applying the provided
+   * vertexValueInitializer map function to the vertex ids.
+   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
   vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
@@ -144,12 +144,12 @@ object Graph {
       env.getJavaEnv))
   }
 
-    /**
-  * Creates a Graph from a DataSet of Tuple2's representing the edges.
-  * The first field of the Tuple2 object for edges will become the source ID,
-  * the second field will become the target ID. The edge value will be set to NullValue.
-  * Vertices are created automatically and their values are set to NullValue.
-  */
+  /**
+   * Creates a Graph from a DataSet of Tuple2's representing the edges.
+   * The first field of the Tuple2 object for edges will become the source ID,
+   * the second field will become the target ID. The edge value will be set to NullValue.
+   * Vertices are created automatically and their values are set to NullValue.
+   */
   def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
   env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
@@ -157,12 +157,12 @@ object Graph {
   }
 
   /**
-  * Creates a Graph from a DataSet of Tuple2's representing the edges.
-  * The first field of the Tuple2 object for edges will become the source ID,
-  * the second field will become the target ID. The edge value will be set to NullValue.
-  * Vertices are created automatically and their values are set by applying the provided
-  * vertexValueInitializer map function to the vertex IDs.
-  */
+   * Creates a Graph from a DataSet of Tuple2's representing the edges.
+   * The first field of the Tuple2 object for edges will become the source ID,
+   * the second field will become the target ID. The edge value will be set to NullValue.
+   * Vertices are created automatically and their values are set by applying the provided
+   * vertexValueInitializer map function to the vertex IDs.
+   */
   def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag]
   (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
   env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
@@ -172,52 +172,52 @@ object Graph {
   }
 
   /** Creates a Graph from a CSV file of edges.
-    *
-    * The edge value is read from the CSV file if [[EV]] is not of type [[NullValue]]. Otherwise the
-    * edge value is set to [[NullValue]].
-    *
-    * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), then the vertex values
-    * are read from the file specified by pathVertices. If the path has not been specified then the
-    * vertexValueInitializer is used to initialize the vertex values of the vertices extracted from
-    * the set of edges. If the vertexValueInitializer has not been set either, then the method
-    * fails.
-    *
-    * @param env The Execution Environment.
-    * @param pathEdges The file path containing the edges.
-    * @param pathVertices The file path containing the vertices.
-    * @param lineDelimiterVertices The string that separates lines in the vertices file. It defaults
-    *                              to newline.
-    * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values in the
-    *                               vertices file.
-    * @param quoteCharacterVertices The character to use for quoted String parsing in the vertices
-    *                               file. Disabled by default.
-    * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
-    * @param ignoreCommentsVertices Lines that start with the given String in the vertices file
-    *                               are ignored, disabled by default.
-    * @param lenientVertices Whether the parser should silently ignore malformed lines in the
-    *                        vertices file.
-    * @param includedFieldsVertices The fields in the vertices file that should be read. By default
-    *                               all fields are read.
-    * @param lineDelimiterEdges The string that separates lines in the edges file. It defaults to
-    *                           newline.
-    * @param fieldDelimiterEdges The string that separates fields in the edges file.
-    * @param quoteCharacterEdges The character to use for quoted String parsing in the edges file.
-    *                            Disabled by default.
-    * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
-    * @param ignoreCommentsEdges Lines that start with the given String in the edges file are
-    *                            ignored, disabled by default.
-    * @param lenientEdges Whether the parser should silently ignore malformed lines in the edges
-    *                     file.
-    * @param includedFieldsEdges The fields in the edges file that should be read. By default all
-    *                            fields are read.
-    * @param vertexValueInitializer  If no vertex values are provided, this mapper can be used to
-    *                                initialize them, by applying a map transformation on the vertex
-    *                                IDs.
-    * @tparam K Vertex key type
-    * @tparam VV Vertex value type
-    * @tparam EV Edge value type
-    * @return Graph with vertices and edges read from the given files.
-    */
+   *
+   * The edge value is read from the CSV file if [[EV]] is not of type [[NullValue]]. Otherwise the
+   * edge value is set to [[NullValue]].
+   *
+   * If the vertex value type [[VV]] is specified (unequal [[NullValue]]), then the vertex values
+   * are read from the file specified by pathVertices. If the path has not been specified then the
+   * vertexValueInitializer is used to initialize the vertex values of the vertices extracted from
+   * the set of edges. If the vertexValueInitializer has not been set either, then the method
+   * fails.
+   *
+   * @param env The Execution Environment.
+   * @param pathEdges The file path containing the edges.
+   * @param pathVertices The file path containing the vertices.
+   * @param lineDelimiterVertices The string that separates lines in the vertices file. It defaults
+   *                              to newline.
+   * @param fieldDelimiterVertices The string that separates vertex Ids from vertex values in the
+   *                               vertices file.
+   * @param quoteCharacterVertices The character to use for quoted String parsing in the vertices
+   *                               file. Disabled by default.
+   * @param ignoreFirstLineVertices Whether the first line in the vertices file should be ignored.
+   * @param ignoreCommentsVertices Lines that start with the given String in the vertices file
+   *                               are ignored, disabled by default.
+   * @param lenientVertices Whether the parser should silently ignore malformed lines in the
+   *                        vertices file.
+   * @param includedFieldsVertices The fields in the vertices file that should be read. By default
+   *                               all fields are read.
+   * @param lineDelimiterEdges The string that separates lines in the edges file. It defaults to
+   *                           newline.
+   * @param fieldDelimiterEdges The string that separates fields in the edges file.
+   * @param quoteCharacterEdges The character to use for quoted String parsing in the edges file.
+   *                            Disabled by default.
+   * @param ignoreFirstLineEdges Whether the first line in the vertices file should be ignored.
+   * @param ignoreCommentsEdges Lines that start with the given String in the edges file are
+   *                            ignored, disabled by default.
+   * @param lenientEdges Whether the parser should silently ignore malformed lines in the edges
+   *                     file.
+   * @param includedFieldsEdges The fields in the edges file that should be read. By default all
+   *                            fields are read.
+   * @param vertexValueInitializer  If no vertex values are provided, this mapper can be used to
+   *                                initialize them, by applying a map transformation on the vertex
+   *                                IDs.
+   * @tparam K Vertex key type
+   * @tparam VV Vertex value type
+   * @tparam EV Edge value type
+   * @return Graph with vertices and edges read from the given files.
+   */
   // scalastyle:off
   // This method exceeds the max allowed number of parameters -->  
   def fromCsvReader[
@@ -295,6 +295,7 @@ object Graph {
 
 /**
  * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
+ *
  * @param jgraph the underlying java api Graph.
  * @tparam K the key type for vertex and edge identifiers
  * @tparam VV the value type for vertices
@@ -341,9 +342,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-  * @return a DataSet of Triplets,
-  * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
-  */
+   * @return a DataSet of Triplets,
+   * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+   */
   def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
     wrap(jgraph.getTriplets())
   }
@@ -418,11 +419,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate vertex and edge IDs using the given function.
-    *
-    * @param fun implements conversion from K to NEW
-    * @return graph with translated vertex and edge IDs
-    */
+   * Translate vertex and edge IDs using the given function.
+   *
+   * @param fun implements conversion from K to NEW
+   * @return graph with translated vertex and edge IDs
+   */
   def translateGraphIds[NEW: TypeInformation : ClassTag](fun: (K, NEW) => NEW):
   Graph[NEW, VV, EV] = {
     val translator: TranslateFunction[K, NEW] = new TranslateFunction[K, NEW] {
@@ -446,11 +447,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate vertex values using the given function.
-    *
-    * @param fun implements conversion from VV to NEW
-    * @return graph with translated vertex values
-    */
+   * Translate vertex values using the given function.
+   *
+   * @param fun implements conversion from VV to NEW
+   * @return graph with translated vertex values
+   */
   def translateVertexValues[NEW: TypeInformation : ClassTag](fun: (VV, NEW) => NEW):
   Graph[K, NEW, EV] = {
     val translator: TranslateFunction[VV, NEW] = new TranslateFunction[VV, NEW] {
@@ -474,11 +475,11 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-    * Translate edge values using the given function.
-    *
-    * @param fun implements conversion from EV to NEW
-    * @return graph with translated edge values
-    */
+   * Translate edge values using the given function.
+   *
+   * @param fun implements conversion from EV to NEW
+   * @return graph with translated edge values
+   */
   def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: (EV, NEW) => NEW):
   Graph[K, VV, NEW] = {
     val translator: TranslateFunction[EV, NEW] = new TranslateFunction[EV, NEW] {
@@ -503,9 +504,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * of the matched Tuple2 from the input DataSet.
    * @return a new Graph, where the vertex values have been updated according to the
    * result of the vertexJoinFunction.
-   * 
    * @tparam T the type of the second field of the input Tuple2 DataSet.
-  */
+   */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
@@ -526,9 +526,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * of the matched Tuple2 from the input DataSet.
    * @return a new Graph, where the vertex values have been updated according to the
    * result of the vertexJoinFunction.
-   * 
    * @tparam T the type of the second field of the input Tuple2 DataSet.
-  */
+   */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
   Graph[K, VV, EV] = {
     val newVertexJoin = new VertexJoinFunction[VV, T]() {
@@ -554,11 +553,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param edgeJoinFunction the transformation function to apply.
    * The first parameter is the current edge value and the second parameter is the value
    * of the matched Tuple3 from the input DataSet.
-   * 
    * @tparam T the type of the third field of the input Tuple3 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
@@ -577,11 +575,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param fun the transformation function to apply.
    * The first parameter is the current edge value and the second parameter is the value
    * of the matched Tuple3 from the input DataSet.
-   * 
    * @tparam T the type of the third field of the input Tuple3 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
   Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -611,7 +608,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
@@ -634,7 +631,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -664,7 +661,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
@@ -687,7 +684,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @tparam T the type of the second field of the input Tuple2 DataSet.
    * @return a new Graph, where the edge values have been updated according to the
    * result of the edgeJoinFunction.
-  */
+   */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
     val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
@@ -945,30 +942,30 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-  * Adds the list of vertices, passed as input, to the graph.
-  * If the vertices already exist in the graph, they will not be added once more.
-  *
-  * @param vertices the list of vertices to add
-  * @return the new graph containing the existing and newly added vertices
-  */
+   * Adds the list of vertices, passed as input, to the graph.
+   * If the vertices already exist in the graph, they will not be added once more.
+   *
+   * @param vertices the list of vertices to add
+   * @return the new graph containing the existing and newly added vertices
+   */
   def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
     wrapGraph(jgraph.addVertices(vertices.asJava))
   }
 
   /**
-  * Adds the given list edges to the graph.
-  *
-  * When adding an edge for a non-existing set of vertices,
-  * the edge is considered invalid and ignored.
-  *
-  * @param edges the data set of edges to be added
-  * @return a new graph containing the existing edges plus the newly added edges.
-  */
+   * Adds the given list edges to the graph.
+   *
+   * When adding an edge for a non-existing set of vertices,
+   * the edge is considered invalid and ignored.
+   *
+   * @param edges the data set of edges to be added
+   * @return a new graph containing the existing edges plus the newly added edges.
+   */
   def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
     wrapGraph(jgraph.addEdges(edges.asJava))
   }
 
-    /**
+  /**
    * Adds the given edge to the graph. If the source and target vertices do
    * not exist in the graph, they will also be added.
    *
@@ -993,7 +990,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
     wrapGraph(jgraph.removeVertex(vertex))
   }
 
-    /**
+  /**
    * Removes the given vertex and its edges from the graph.
    *
    * @param vertices list of vertices to remove
@@ -1037,12 +1034,13 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-  * Performs Difference on the vertex and edge sets of the input graphs
-  * removes common vertices and edges. If a source/target vertex is removed,
-  * its corresponding edge will also be removed
-  * @param graph the graph to perform difference with
-  * @return a new graph where the common vertices and edges have been removed
-  */
+   * Performs Difference on the vertex and edge sets of the input graphs
+   * removes common vertices and edges. If a source/target vertex is removed,
+   * its corresponding edge will also be removed
+   *
+   * @param graph the graph to perform difference with
+   * @return a new graph where the common vertices and edges have been removed
+   */
   def difference(graph: Graph[K, VV, EV]) = {
     wrapGraph(jgraph.difference(graph.getWrappedGraph))
   }
@@ -1135,36 +1133,34 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * Runs a scatter-gather iteration on the graph.
    * No configuration options are provided.
    *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
+   * @param scatterFunction the scatter function
+   * @param gatherFunction the gather function
    * @param maxIterations maximum number of iterations to perform
-   *
    * @return the updated Graph after the scatter-gather iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+  def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
+                                   gatherFunction: SpargelGatherFunction[K, VV, M],
                                    maxIterations: Int): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction,
       maxIterations))
   }
 
   /**
    * Runs a scatter-gather iteration on the graph with configuration options.
    *
-   * @param vertexUpdateFunction the vertex update function
-   * @param messagingFunction the messaging function
+   * @param scatterFunction the scatter function
+   * @param gatherFunction the gather function
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
-   *
    * @return the updated Graph after the scatter-gather iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
-                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+  def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
+                                   gatherFunction: SpargelGatherFunction[K, VV, M],
                                    maxIterations: Int, parameters: ScatterGatherConfiguration):
   Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(scatterFunction, gatherFunction,
       maxIterations, parameters))
   }
 
@@ -1178,11 +1174,10 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param applyFunction the apply function updates the vertex values with the aggregates
    * @param maxIterations maximum number of iterations to perform
    * @tparam M the intermediate type used between gather, sum and apply
-   *
    * @return the updated Graph after the gather-sum-apply iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], sumFunction:
   SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K,
     VV, EV] = {
     wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
@@ -1199,25 +1194,23 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
    * @tparam M the intermediate type used between gather, sum and apply
-   *
    * @return the updated Graph after the gather-sum-apply iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], sumFunction:
   SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int,
                                     parameters: GSAConfiguration): Graph[K, VV, EV] = {
     wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
       maxIterations, parameters))
   }
 
-   /**
+  /**
    * Runs a vertex-centric iteration on the graph.
    * No configuration options are provided.
    *
    * @param computeFunction the compute function
    * @param combineFunction the optional message combiner function
    * @param maxIterations maximum number of iterations to perform
-   *
    * @return the updated Graph after the vertex-centric iteration has converged or
    *         after maximumNumberOfIterations.
    */
@@ -1235,7 +1228,6 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @param combineFunction the optional message combiner function
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
-   *
    * @return the updated Graph after the vertex-centric iteration has converged or
    *         after maximumNumberOfIterations.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index fe59283..821b0a7 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -45,17 +45,15 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.TranslateVertexValues;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GSAConfiguration;
-import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.pregel.ComputeFunction;
 import org.apache.flink.graph.pregel.MessageCombiner;
 import org.apache.flink.graph.pregel.VertexCentricConfiguration;
 import org.apache.flink.graph.pregel.VertexCentricIteration;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
@@ -1652,27 +1650,27 @@ public class Graph<K, VV, EV> {
 	 * Runs a ScatterGather iteration on the graph.
 	 * No configuration options are provided.
 	 *
-	 * @param vertexUpdateFunction the vertex update function
-	 * @param messagingFunction the messaging function
+	 * @param scatterFunction the scatter function
+	 * @param gatherFunction the gather function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * 
 	 * @return the updated Graph after the scatter-gather iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runScatterGatherIteration(
-			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-			MessagingFunction<K, VV, M, EV> messagingFunction,
+			ScatterFunction<K, VV, M, EV> scatterFunction,
+			org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction,
 			int maximumNumberOfIterations) {
 
-		return this.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
+		return this.runScatterGatherIteration(scatterFunction, gatherFunction,
 				maximumNumberOfIterations, null);
 	}
 
 	/**
 	 * Runs a ScatterGather iteration on the graph with configuration options.
-	 * 
-	 * @param vertexUpdateFunction the vertex update function
-	 * @param messagingFunction the messaging function
+	 *
+	 * @param scatterFunction the scatter function
+	 * @param gatherFunction the gather function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * @param parameters the iteration configuration parameters
 	 * 
@@ -1680,12 +1678,12 @@ public class Graph<K, VV, EV> {
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runScatterGatherIteration(
-			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-			MessagingFunction<K, VV, M, EV> messagingFunction,
+			ScatterFunction<K, VV, M, EV> scatterFunction,
+			org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction,
 			int maximumNumberOfIterations, ScatterGatherConfiguration parameters) {
 
 		ScatterGatherIteration<K, VV, M, EV> iteration = ScatterGatherIteration.withEdges(
-				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+				edges, scatterFunction, gatherFunction, maximumNumberOfIterations);
 
 		iteration.configure(parameters);
 
@@ -1708,7 +1706,7 @@ public class Graph<K, VV, EV> {
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+			org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
 			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
 
 		return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
@@ -1729,7 +1727,7 @@ public class Graph<K, VV, EV> {
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+			org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
 			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
 			GSAConfiguration parameters) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 7578420..0b98a27 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -133,8 +134,8 @@ public abstract class IterationConfiguration {
 
 	/**
 	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
-	 * via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * via {@link GatherFunction#getIterationAggregator(String)} and
+	 * {@link GatherFunction#getPreviousIterationAggregate(String)}.
 	 * 
 	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
 	 * @param aggregator The aggregator.

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 4d68661..f554680 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -26,9 +26,9 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 
 import java.util.Map;
 import java.util.TreeMap;
@@ -73,19 +73,33 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr
 	public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
 		DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
-				.map(new AddScoreToVertexValuesMapper<K>());
+			.map(new AddScoreToVertexValuesMapper<K>());
 
 		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
-				Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
+			Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
 
-		return graphWithScoredVertices.runScatterGatherIteration(new VertexLabelUpdater<K>(delta),
-				new LabelMessenger<K>(), maxIterations)
+		return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<K>(),
+			new VertexLabelUpdater<K>(delta), maxIterations)
 				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
 	}
 
 	@SuppressWarnings("serial")
-	public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
-		K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+	public static final class LabelMessenger<K> extends ScatterFunction<K, Tuple2<Long, Double>,
+			Tuple2<Long, Double>, Double> {
+
+		@Override
+		public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
+
+			for(Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+					vertex.getValue().f1 * edge.getValue()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexLabelUpdater<K> extends GatherFunction<
+			K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		private Double delta;
 
@@ -154,27 +168,13 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr
 	}
 
 	@SuppressWarnings("serial")
-	public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
-			Tuple2<Long, Double>, Double> {
-
-		@Override
-		public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
-
-			for(Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
-						vertex.getValue().f1 * edge.getValue()));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
 	@ForwardedFields("f0")
 	public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
 		Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
 
 		public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
 			return new Vertex<K, Tuple2<Long, Double>>(
-					vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
+				vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index efc32c1..3cd8f05 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
@@ -76,39 +76,16 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV>
 			.getUndirected();
 
 		return undirectedGraph.runScatterGatherIteration(
-			new CCUpdater<K, VV>(),
 			new CCMessenger<K, VV>(valueTypeInfo),
+			new CCUpdater<K, VV>(),
 			maxIterations).getVertices();
 	}
 
 	/**
-	 * Updates the value of a vertex by picking the minimum neighbor value out of all the incoming messages.
-	 */
-	public static final class CCUpdater<K, VV extends Comparable<VV>>
-		extends VertexUpdateFunction<K, VV, VV> {
-
-		@Override
-		public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messages) throws Exception {
-			VV current = vertex.getValue();
-			VV min = current;
-
-			for (VV msg : messages) {
-				if (msg.compareTo(min) < 0) {
-					min = msg;
-				}
-			}
-
-			if (!min.equals(current)) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-
-	/**
 	 * Sends the current vertex value to all adjacent vertices.
 	 */
 	public static final class CCMessenger<K, VV extends Comparable<VV>>
-		extends MessagingFunction<K, VV, VV, NullValue>
+		extends ScatterFunction<K, VV, VV, NullValue>
 		implements ResultTypeQueryable<VV> {
 
 		private final TypeInformation<VV> typeInformation;
@@ -128,4 +105,27 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV>
 			return typeInformation;
 		}
 	}
+
+	/**
+	 * Updates the value of a vertex by picking the minimum neighbor value out of all the incoming messages.
+	 */
+	public static final class CCUpdater<K, VV extends Comparable<VV>>
+		extends GatherFunction<K, VV, VV> {
+
+		@Override
+		public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messages) throws Exception {
+			VV current = vertex.getValue();
+			VV min = current;
+
+			for (VV msg : messages) {
+				if (msg.compareTo(min) < 0) {
+					min = msg;
+				}
+			}
+
+			if (!min.equals(current)) {
+				setNewVertexValue(min);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index a12cb20..327de73 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -27,8 +27,8 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 5899fa0..f39d858 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -25,8 +25,8 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
 
 /**
  * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index fef6808..2d13dfd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
@@ -78,15 +78,38 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 		return input
 			.mapEdges(new NullValueEdgeMapper<K, EV>())
 			.runScatterGatherIteration(
-				new UpdateVertexLabel<K, VV>(), new SendNewLabelToNeighbors<K, VV>(valueType), maxIterations)
+				new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations)
 			.getVertices();
 	}
 
 	/**
+	 * Sends the vertex label to all out-neighbors
+	 */
+	public static final class SendNewLabelToNeighbors<K, VV extends Comparable<VV>>
+		extends ScatterFunction<K, VV, VV, NullValue>
+		implements ResultTypeQueryable<VV> {
+
+		private final TypeInformation<VV> typeInformation;
+
+		public SendNewLabelToNeighbors(TypeInformation<VV> typeInformation) {
+			this.typeInformation = typeInformation;
+		}
+
+		public void sendMessages(Vertex<K, VV> vertex) {
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+
+		@Override
+		public TypeInformation<VV> getProducedType() {
+			return typeInformation;
+		}
+	}
+
+	/**
 	 * Function that updates the value of a vertex by adopting the most frequent
 	 * label among its in-neighbors
 	 */
-	public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends VertexUpdateFunction<K, VV, VV> {
+	public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> {
 
 		public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) {
 			Map<VV, Long> labelsWithFrequencies = new HashMap<VV, Long>();
@@ -119,27 +142,4 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 			setNewVertexValue(mostFrequentLabel);
 		}
 	}
-
-	/**
-	 * Sends the vertex label to all out-neighbors
-	 */
-	public static final class SendNewLabelToNeighbors<K, VV extends Comparable<VV>>
-		extends MessagingFunction<K, VV, VV, NullValue>
-		implements ResultTypeQueryable<VV> {
-
-		private final TypeInformation<VV> typeInformation;
-
-		public SendNewLabelToNeighbors(TypeInformation<VV> typeInformation) {
-			this.typeInformation = typeInformation;
-		}
-
-		public void sendMessages(Vertex<K, VV> vertex) {
-			sendMessageToAllNeighbors(vertex.getValue());
-		}
-
-		@Override
-		public TypeInformation<VV> getProducedType() {
-			return typeInformation;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 2f1b03b..bf9b4e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -25,10 +25,10 @@ import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.LongValue;
 
 /**
@@ -65,18 +65,37 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 		parameters.setOptNumVertices(true);
 
-		return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta),
-				new RankMessenger<K>(), maxIterations, parameters)
+		return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(),
+				new VertexRankUpdater<K>(beta), maxIterations, parameters)
 				.getVertices();
 	}
 
 	/**
+	 * Distributes the rank of a vertex among all target vertices according to
+	 * the transition probability, which is associated with an edge as the edge
+	 * value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> {
+		@Override
+		public void sendMessages(Vertex<K, Double> vertex) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				vertex.setValue(1.0 / this.getNumberOfVertices());
+			}
+
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
+			}
+		}
+	}
+
+	/**
 	 * Function that updates the rank of a vertex by summing up the partial
 	 * ranks from all incoming messages and then applying the dampening formula.
 	 */
 	@SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
+	public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> {
 		private final double beta;
 
 		public VertexRankUpdater(double beta) {
@@ -96,30 +115,8 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 		}
 	}
 
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to
-	 * the transition probability, which is associated with an edge as the edge
-	 * value.
-	 */
-	@SuppressWarnings("serial")
-	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		@Override
-		public void sendMessages(Vertex<K, Double> vertex) {
-			if (getSuperstepNumber() == 1) {
-				// initialize vertex ranks
-				vertex.setValue(1.0 / this.getNumberOfVertices());
-			}
-
-			for (Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
-			}
-		}
-	}
-
 	@SuppressWarnings("serial")
 	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
-
 		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
 			return edgeValue / (double) inputValue.getValue();
 		}


Mime
View raw message