flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/3] flink git commit: [FLINK-3208] [gelly] rename vertex-centric iteration model to scatter-gather
Date Tue, 02 Feb 2016 10:08:29 GMT
[FLINK-3208] [gelly] rename vertex-centric iteration model to scatter-gather

This closes #1514


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8677464
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8677464
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8677464

Branch: refs/heads/master
Commit: f8677464b0c7487e2882e2d4950e5f4b8c488951
Parents: 233c014
Author: vasia <vasia@apache.org>
Authored: Fri Jan 15 20:08:04 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Tue Feb 2 10:17:29 2016 +0100

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        | 104 +--
 .../org/apache/flink/graph/scala/Graph.scala    |  20 +-
 .../example/SingleSourceShortestPaths.scala     |   6 +-
 .../main/java/org/apache/flink/graph/Graph.java |  22 +-
 .../org/apache/flink/graph/GraphCsvReader.java  |   1 -
 .../flink/graph/example/IncrementalSSSP.java    |  10 +-
 .../example/SingleSourceShortestPaths.java      |   8 +-
 .../flink/graph/library/CommunityDetection.java |   2 +-
 .../graph/library/ConnectedComponents.java      |   6 +-
 .../flink/graph/library/LabelPropagation.java   |   2 +-
 .../apache/flink/graph/library/PageRank.java    |   4 +-
 .../library/SingleSourceShortestPaths.java      |   4 +-
 .../flink/graph/spargel/MessagingFunction.java  |   6 +-
 .../spargel/ScatterGatherConfiguration.java     | 135 ++++
 .../graph/spargel/ScatterGatherIteration.java   | 685 ++++++++++++++++
 .../spargel/VertexCentricConfiguration.java     | 135 ----
 .../graph/spargel/VertexCentricIteration.java   | 686 ----------------
 .../graph/spargel/VertexUpdateFunction.java     |   6 +-
 .../graph/spargel/SpargelCompilerTest.java      |   7 +-
 .../graph/spargel/SpargelTranslationTest.java   |   8 +-
 .../test/CollectionModeSuperstepITCase.java     |   2 +-
 .../test/ScatterGatherConfigurationITCase.java  | 796 +++++++++++++++++++
 .../test/VertexCentricConfigurationITCase.java  | 796 -------------------
 .../test/example/IncrementalSSSPITCase.java     |   8 +-
 24 files changed, 1728 insertions(+), 1731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index fc8dbbf..ccee319 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -790,28 +790,28 @@ When the aggregation computation does not require access to the vertex value (fo
 
 Iterative Graph Processing
 -----------
-Gelly exploits Flink's efficient iteration operators to support large-scale iterative graph processing. Currently, we provide implementations of the popular vertex-centric iterative model and a variation of Gather-Sum-Apply. In the following sections, we describe these models and show how you can use them in Gelly.
+Gelly exploits Flink's efficient iteration operators to support large-scale iterative graph processing. Currently, we provide implementations of the popular scatter-gather iterative model and a variation of Gather-Sum-Apply. In the following sections, we describe these models and show how you can use them in Gelly.
 
-### Vertex-centric Iterations
-The vertex-centric model, also known as "think like a vertex" model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use vertex-centric iterations in Gelly, the user only needs to define how a vertex behaves in each superstep:
+### Scatter-Gather Iterations
+The scatter-gather model, also known as "signal/collect" model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use scatter-gather iterations in Gelly, the user only needs to define how a vertex behaves in each superstep:
 
-* <strong>Messaging</strong>:  produce the messages that a vertex will send to other vertices.
-* <strong>Value Update</strong>: update the vertex value using the received messages.
+* <strong>Messaging</strong>:  corresponds to the scatter phase and produces the messages that a vertex will send to other vertices.
+* <strong>Value Update</strong>: corresponds to the gather phase and updates the vertex value using the received messages.
 
-Gelly provides methods for vertex-centric iterations. The user only needs to implement two functions, corresponding to the phases above: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
-These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values.
+Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a `MessagingFunction`, which allows a vertex to send out messages for other vertices. Messages are recieved during the same superstep as they are sent. The second function is `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages.
+These functions and the maximum number of iterations to run are given as parameters to Gelly's `runScatterGatherIteration`. This method will execute the scatter-gather iteration on the input Graph and return a new Graph, with updated vertex values.
 
-A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
-Additionally, the  neighborhood type (in/out/all) over which to run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex's state and messages are sent to out-neighbors.
+A scatter-gather iteration can be extended with information such as the total number of vertices, the in degree and out degree.
+Additionally, the  neighborhood type (in/out/all) over which to run the scatter-gather iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex's state and messages are sent to out-neighbors.
 
-Let us consider computing Single-Source-Shortest-Paths with vertex-centric iterations on the following graph and let vertex 1 be the source. In each superstep, each vertex sends a candidate distance message to all its neighbors. The message value is the sum of the current value of the vertex and the edge weight connecting this vertex with its neighbor. Upon receiving candidate distance messages, each vertex calculates the minimum distance and, if a shorter path has been discovered, it updates its value. If a vertex does not change its value during a superstep, then it does not produce messages for its neighbors for the next superstep. The algorithm converges when there are no value updates.
+Let us consider computing Single-Source-Shortest-Paths with scatter-gather iterations on the following graph and let vertex 1 be the source. In each superstep, each vertex sends a candidate distance message to all its neighbors. The message value is the sum of the current value of the vertex and the edge weight connecting this vertex with its neighbor. Upon receiving candidate distance messages, each vertex calculates the minimum distance and, if a shorter path has been discovered, it updates its value. If a vertex does not change its value during a superstep, then it does not produce messages for its neighbors for the next superstep. The algorithm converges when there are no value updates.
 
 <p class="text-center">
-    <img alt="Vertex-centric SSSP superstep 1" width="70%" src="fig/gelly-vc-sssp1.png"/>
+    <img alt="Scatter-gather SSSP superstep 1" width="70%" src="fig/gelly-vc-sssp1.png"/>
 </p>
 
 <p class="text-center">
-    <img alt="Vertex-centric SSSP superstep 2" width="70%" src="fig/gelly-vc-sssp2.png"/>
+    <img alt="Scatter-gather SSSP superstep 2" width="70%" src="fig/gelly-vc-sssp2.png"/>
 </p>
 
 <div class="codetabs" markdown="1">
@@ -823,8 +823,8 @@ Graph<Long, Double, Double> graph = ...
 // define the maximum number of iterations
 int maxIterations = 10;
 
-// Execute the vertex-centric iteration
-Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+// Execute the scatter-gather iteration
+Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
 			new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
 
 // Extract the vertices as the result
@@ -833,7 +833,7 @@ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
 
 // - - -  UDFs - - - //
 
-// messaging
+// scatter: messaging
 public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
 
 	public void sendMessages(Vertex<Long, Double> vertex) {
@@ -843,7 +843,7 @@ public static final class MinDistanceMessenger extends MessagingFunction<Long, D
 	}
 }
 
-// vertex update
+// gather: vertex update
 public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
 
 	public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
@@ -872,8 +872,8 @@ val graph: Graph[Long, Double, Double] = ...
 // define the maximum number of iterations
 val maxIterations = 10
 
-// Execute the vertex-centric iteration
-val result = graph.runVertexCentricIteration(new VertexDistanceUpdater, new MinDistanceMessenger, maxIterations)
+// Execute the scatter-gather iteration
+val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, new MinDistanceMessenger, maxIterations)
 
 // Extract the vertices as the result
 val singleSourceShortestPaths = result.getVertices
@@ -915,11 +915,11 @@ final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Dou
 
 {% top %}
 
-### Configuring a Vertex-Centric Iteration
-A vertex-centric iteration can be configured using a `VertexCentricConfiguration` object.
+### Configuring a Scatter-Gather Iteration
+A scatter-gather iteration can be configured using a `ScatterGatherConfiguration` object.
 Currently, the following parameters can be specified:
 
-* <strong>Name</strong>: The name for the vertex-centric iteration. The name is displayed in logs and messages
+* <strong>Name</strong>: The name for the scatter-gather iteration. The name is displayed in logs and messages
 and can be specified using the `setName()` method.
 
 * <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.
@@ -947,7 +947,7 @@ If the degrees option is not set in the configuration, these methods will return
 Graph<Long, Double, Double> graph = ...
 
 // configure the iteration
-VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
 // set the iteration name
 parameters.setName("Gelly Iteration");
@@ -958,9 +958,9 @@ parameters.setParallelism(16);
 // register an aggregator
 parameters.registerAggregator("sumAggregator", new LongSumAggregator());
 
-// run the vertex-centric iteration, also passing the configuration parameters
+// run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
-			graph.runVertexCentricIteration(
+			graph.runScatterGatherIteration(
 			new VertexUpdater(), new Messenger(), maxIterations, parameters);
 
 // user-defined functions
@@ -998,7 +998,7 @@ public static final class Messenger extends MessagingFunction {...}
 
 val graph: Graph[Long, Double, Double] = ...
 
-val parameters = new VertexCentricConfiguration
+val parameters = new ScatterGatherConfiguration
 
 // set the iteration name
 parameters.setName("Gelly Iteration")
@@ -1009,8 +1009,8 @@ parameters.setParallelism(16)
 // register an aggregator
 parameters.registerAggregator("sumAggregator", new LongSumAggregator)
 
-// run the vertex-centric iteration, also passing the configuration parameters
-val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+// run the scatter-gather iteration, also passing the configuration parameters
+val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
 
 // user-defined functions
 final class VertexUpdater extends VertexUpdateFunction {
@@ -1052,7 +1052,7 @@ The following example illustrates the usage of the degree as well as the number
 Graph<Long, Double, Double> graph = ...
 
 // configure the iteration
-VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
 // set the number of vertices option to true
 parameters.setOptNumVertices(true);
@@ -1060,9 +1060,9 @@ parameters.setOptNumVertices(true);
 // set the degree option to true
 parameters.setOptDegrees(true);
 
-// run the vertex-centric iteration, also passing the configuration parameters
+// run the scatter-gather iteration, also passing the configuration parameters
 Graph<Long, Double, Double> result =
-			graph.runVertexCentricIteration(
+			graph.runScatterGatherIteration(
 			new VertexUpdater(), new Messenger(), maxIterations, parameters);
 
 // user-defined functions
@@ -1089,7 +1089,7 @@ public static final class Messenger {
 val graph: Graph[Long, Double, Double] = ...
 
 // configure the iteration
-val parameters = new VertexCentricConfiguration
+val parameters = new ScatterGatherConfiguration
 
 // set the number of vertices option to true
 parameters.setOptNumVertices(true)
@@ -1097,8 +1097,8 @@ parameters.setOptNumVertices(true)
 // set the degree option to true
 parameters.setOptDegrees(true)
 
-// run the vertex-centric iteration, also passing the configuration parameters
-val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+// run the scatter-gather iteration, also passing the configuration parameters
+val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
 
 // user-defined functions
 final class VertexUpdater {
@@ -1127,14 +1127,14 @@ The following example illustrates the usage of the edge direction option. Vertic
 Graph<Long, HashSet<Long>, Double> graph = ...
 
 // configure the iteration
-VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
 // set the messaging direction
 parameters.setDirection(EdgeDirection.IN);
 
-// run the vertex-centric iteration, also passing the configuration parameters
+// run the scatter-gather iteration, also passing the configuration parameters
 DataSet<Vertex<Long, HashSet<Long>>> result =
-			graph.runVertexCentricIteration(
+			graph.runScatterGatherIteration(
 			new VertexUpdater(), new Messenger(), maxIterations, parameters)
 			.getVertices();
 
@@ -1151,13 +1151,13 @@ public static final class Messenger {...}
 val graph: Graph[Long, HashSet[Long], Double] = ...
 
 // configure the iteration
-val parameters = new VertexCentricConfiguration
+val parameters = new ScatterGatherConfiguration
 
 // set the messaging direction
 parameters.setDirection(EdgeDirection.IN)
 
-// run the vertex-centric iteration, also passing the configuration parameters
-val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+// run the scatter-gather iteration, also passing the configuration parameters
+val result = graph.runScatterGatherIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
 			.getVertices
 
 // user-defined functions
@@ -1172,7 +1172,7 @@ final class Messenger {...}
 {% top %}
 
 ### Gather-Sum-Apply Iterations
-Like in the vertex-centric model, Gather-Sum-Apply also proceeds in synchronized iterative steps, called supersteps. Each superstep consists of the following three phases:
+Like in the scatter-gather model, Gather-Sum-Apply also proceeds in synchronized iterative steps, called supersteps. Each superstep consists of the following three phases:
 
 * <strong>Gather</strong>: a user-defined function is invoked in parallel on the edges and neighbors of each vertex, producing a partial value.
 * <strong>Sum</strong>: the partial values produced in the Gather phase are aggregated to a single value, using a user-defined reducer.
@@ -1442,16 +1442,16 @@ val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, ma
 </div>
 {% top %}
 
-### Vertex-centric and GSA Comparison
-As seen in the examples above, Gather-Sum-Apply iterations are quite similar to vertex-centric iterations. In fact, any algorithm which can be expressed as a GSA iteration can also be written in the vertex-centric model.
-The messaging phase of the vertex-centric model is equivalent to the Gather and Sum steps of GSA: Gather can be seen as the phase where the messages are produced and Sum as the phase where they are routed to the target vertex. Similarly, the value update phase corresponds to the Apply step.
+### Scatter-Gather and GSA Comparison
+As seen in the examples above, Gather-Sum-Apply iterations are quite similar to scatter-gather iterations. In fact, any algorithm which can be expressed as a GSA iteration can also be written in the scatter-gather model.
+The messaging phase of the scatter-gather model is equivalent to the Gather and Sum steps of GSA: Gather can be seen as the phase where the messages are produced and Sum as the phase where they are routed to the target vertex. Similarly, the value update phase corresponds to the Apply step.
 
-The main difference between the two implementations is that the Gather phase of GSA parallelizes the computation over the edges, while the messaging phase distributes the computation over the vertices. Using the SSSP examples above, we see that in the first superstep of the vertex-centric case, vertices 1, 2 and 3 produce messages in parallel. Vertex 1 produces 3 messages, while vertices 2 and 3 produce one message each. In the GSA case on the other hand, the computation is parallelized over the edges: the three candidate distance values of vertex 1 are produced in parallel. Thus, if the Gather step contains "heavy" computation, it might be a better idea to use GSA and spread out the computation, instead of burdening a single vertex. Another case when parallelizing over the edges might prove to be more efficient is when the input graph is skewed (some vertices have a lot more neighbors than others).
+The main difference between the two implementations is that the Gather phase of GSA parallelizes the computation over the edges, while the messaging phase distributes the computation over the vertices. Using the SSSP examples above, we see that in the first superstep of the scatter-gather case, vertices 1, 2 and 3 produce messages in parallel. Vertex 1 produces 3 messages, while vertices 2 and 3 produce one message each. In the GSA case on the other hand, the computation is parallelized over the edges: the three candidate distance values of vertex 1 are produced in parallel. Thus, if the Gather step contains "heavy" computation, it might be a better idea to use GSA and spread out the computation, instead of burdening a single vertex. Another case when parallelizing over the edges might prove to be more efficient is when the input graph is skewed (some vertices have a lot more neighbors than others).
 
-Another difference between the two implementations is that the vertex-centric implementation uses a `coGroup` operator internally, while GSA uses a `reduce`. Therefore, if the function that combines neighbor values (messages) requires the whole group of values for the computation, vertex-centric should be used. If the update function is associative and commutative, then the GSA's reducer is expected to give a more efficient implementation, as it can make use of a combiner.
+Another difference between the two implementations is that the scatter-gather implementation uses a `coGroup` operator internally, while GSA uses a `reduce`. Therefore, if the function that combines neighbor values (messages) requires the whole group of values for the computation, scatter-gather should be used. If the update function is associative and commutative, then the GSA's reducer is expected to give a more efficient implementation, as it can make use of a combiner.
 
-Another thing to note is that GSA works strictly on neighborhoods, while in the vertex-centric model, a vertex can send a message to any vertex, given that it knows its vertex ID, regardless of whether it is a neighbor.
-Finally, in Gelly's vertex-centric implementation, one can choose the messaging direction, i.e. the direction in which updates propagate. GSA does not support this yet, so each vertex will be updated based on the values of its in-neighbors only.
+Another thing to note is that GSA works strictly on neighborhoods, while in the scatter-gather model, a vertex can send a message to any vertex, given that it knows its vertex ID, regardless of whether it is a neighbor.
+Finally, in Gelly's scatter-gather implementation, one can choose the messaging direction, i.e. the direction in which updates propagate. GSA does not support this yet, so each vertex will be updated based on the values of its in-neighbors only.
 
 Graph Validation
 -----------
@@ -1556,7 +1556,7 @@ In graph theory, communities refer to groups of nodes that are well connected in
 This library method is an implementation of the community detection algorithm described in the paper [Towards real-time community detection in large networks](http://arxiv.org/pdf/0808.2633.pdf%22%3Earticle%20explaining%20the%20algorithm%20in%20detail).
 
 #### Details
-The algorithm is implemented using [vertex-centric iterations](#vertex-centric-iterations).
+The algorithm is implemented using [scatter-gather iterations](#scatter-gather-iterations).
 Initially, each vertex is assigned a `Tuple2` containing its initial value along with a score equal to 1.0.
 In each iteration, vertices send their labels and scores to their neighbors. Upon receiving messages from its neighbors,
 a vertex chooses the label with the highest score and subsequently re-scores it using the edge values,
@@ -1578,7 +1578,7 @@ The constructor takes two parameters:
 This is an implementation of the well-known Label Propagation algorithm described in [this paper](http://journals.aps.org/pre/abstract/10.1103/PhysRevE.76.036106). The algorithm discovers communities in a graph, by iteratively propagating labels between neighbors. Unlike the [Community Detection library method](#community-detection), this implementation does not use scores associated with the labels.
 
 #### Details
-The algorithm is implemented using [vertex-centric iterations](#vertex-centric-iterations).
+The algorithm is implemented using [scatter-gather iterations](#scatter-gather-iterations).
 Labels are expected to be of type `Comparable` and are initialized using the vertex values of the input `Graph`.
 The algorithm iteratively refines discovered communities by propagating labels. In each iteration, a vertex adopts
 the label that is most frequent among its neighbors' labels. In case of a tie (i.e. two or more labels appear with the 
@@ -1599,7 +1599,7 @@ This is an implementation of the Weakly Connected Components algorithm. Upon con
 without taking edge direction into account.
 
 #### Details
-The algorithm is implemented using [vertex-centric iterations](#vertex-centric-iterations).
+The algorithm is implemented using [scatter-gather iterations](#scatter-gather-iterations).
 This implementation assumes that the vertex values of the input Graph are initialized with Long component IDs.
 The vertices propagate their current component ID in iterations. Upon receiving component IDs from its neighbors, a vertex adopts a new component ID if its value
 is lower than its current component ID. The algorithm converges when vertices no longer update their component ID value or when the maximum number of iterations has been reached.
@@ -1619,7 +1619,7 @@ See the [Connected Components](#connected-components) library method for impleme
 ### PageRank
 
 #### Overview
-An implementation of a simple [PageRank algorithm](https://en.wikipedia.org/wiki/PageRank), using [vertex-centric iterations](#vertex-centric-iterations).
+An implementation of a simple [PageRank algorithm](https://en.wikipedia.org/wiki/PageRank), using [scatter-gather iterations](#scatter-gather-iterations).
 PageRank is an algorithm that was first used to rank web search engine results. Today, the algorithm and many variations, are used in various graph application domains. The idea of PageRank is that important or relevant pages tend to link to other important pages.
 
 #### Details
@@ -1646,7 +1646,7 @@ See the [PageRank](#pagerank) library method for implementation details and usag
 An implementation of the Single-Source-Shortest-Paths algorithm for weighted graphs. Given a source vertex, the algorithm computes the shortest paths from this source to all other nodes in the graph.
 
 #### Details
-The algorithm is implemented using [vertex-centric iterations](#vertex-centric-iterations).
+The algorithm is implemented using [scatter-gather iterations](#scatter-gather-iterations).
 In each iteration, a vertex sends to its neighbors a message containing the sum its current distance and the edge weight connecting this vertex with the neighbor. Upon receiving candidate distance messages, a vertex calculates the minimum distance and, if a shorter path has been discovered, it updates its value. If a vertex does not change its value during a superstep, then it does not produce messages for its neighbors for the next superstep. The computation terminates after the specified maximum number of supersteps or when there are no value updates.
 
 #### Usage

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 737f758..19d6dd3 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.graph._
 import org.apache.flink.graph.validation.GraphValidator
 import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
-import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
+import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
 import org.apache.flink.{graph => jg}
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.reflect.ClassTag
@@ -1027,39 +1027,39 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-   * Runs a Vertex-Centric iteration on the graph.
+   * Runs a scatter-gather iteration on the graph.
    * No configuration options are provided.
    *
    * @param vertexUpdateFunction the vertex update function
    * @param messagingFunction the messaging function
    * @param maxIterations maximum number of iterations to perform
    *
-   * @return the updated Graph after the vertex-centric iteration has converged or
+   * @return the updated Graph after the scatter-gather iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+  def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
                                    messagingFunction: MessagingFunction[K, VV, M, EV],
                                    maxIterations: Int): Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
       maxIterations))
   }
 
   /**
-   * Runs a Vertex-Centric iteration on the graph with configuration options.
+   * Runs a scatter-gather iteration on the graph with configuration options.
    *
    * @param vertexUpdateFunction the vertex update function
    * @param messagingFunction the messaging function
    * @param maxIterations maximum number of iterations to perform
    * @param parameters the iteration configuration parameters
    *
-   * @return the updated Graph after the vertex-centric iteration has converged or
+   * @return the updated Graph after the scatter-gather iteration has converged or
    *         after maximumNumberOfIterations.
    */
-  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+  def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
                                    messagingFunction: MessagingFunction[K, VV, M, EV],
-                                   maxIterations: Int, parameters: VertexCentricConfiguration):
+                                   maxIterations: Int, parameters: ScatterGatherConfiguration):
   Graph[K, VV, EV] = {
-    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+    wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
       maxIterations, parameters))
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
index 8b918d4..827f1a3 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -31,7 +31,7 @@ import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
 
 /**
- * This example shows how to use Gelly's vertex-centric iterations.
+ * This example shows how to use Gelly's scatter-gather iterations.
  * 
  * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
  *
@@ -54,8 +54,8 @@ object SingleSourceShortestPaths {
     val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
     val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
 
-    // Execute the vertex-centric iteration
-    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+    // Execute the scatter-gather iteration
+    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
       new MinDistanceMessenger, maxIterations)
 
     // Extract the vertices as the result

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index cf16fd3..ce8e895 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -52,8 +52,8 @@ import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricConfiguration;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.ScatterGatherIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
@@ -1585,42 +1585,42 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Runs a Vertex-Centric iteration on the graph.
+	 * Runs a ScatterGather iteration on the graph.
 	 * No configuration options are provided.
 	 *
 	 * @param vertexUpdateFunction the vertex update function
 	 * @param messagingFunction the messaging function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * 
-	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * @return the updated Graph after the scatter-gather iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
-	public <M> Graph<K, VV, EV> runVertexCentricIteration(
+	public <M> Graph<K, VV, EV> runScatterGatherIteration(
 			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
 			MessagingFunction<K, VV, M, EV> messagingFunction,
 			int maximumNumberOfIterations) {
 
-		return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+		return this.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
 				maximumNumberOfIterations, null);
 	}
 
 	/**
-	 * Runs a Vertex-Centric iteration on the graph with configuration options.
+	 * Runs a ScatterGather iteration on the graph with configuration options.
 	 * 
 	 * @param vertexUpdateFunction the vertex update function
 	 * @param messagingFunction the messaging function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * @param parameters the iteration configuration parameters
 	 * 
-	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * @return the updated Graph after the scatter-gather iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
-	public <M> Graph<K, VV, EV> runVertexCentricIteration(
+	public <M> Graph<K, VV, EV> runScatterGatherIteration(
 			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
 			MessagingFunction<K, VV, M, EV> messagingFunction,
-			int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
+			int maximumNumberOfIterations, ScatterGatherConfiguration parameters) {
 
-		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+		ScatterGatherIteration<K, VV, M, EV> iteration = ScatterGatherIteration.withEdges(
 				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
 
 		iteration.configure(parameters);

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index b6f3f4e..a8c50bb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -154,7 +154,6 @@ public class GraphCsvReader {
 			throw new RuntimeException("The edges input file cannot be null!");
 		}
 
-		@SuppressWarnings("serial")
 		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
 				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index f454376..9e00760 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -29,14 +29,14 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.IncrementalSSSPData;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
  * This example illustrates how to 
  * <ul>
  *  <li> create a Graph directly from CSV files
- *  <li> use the vertex-centric iteration's messaging direction configuration option
+ *  <li> use the scatter-gather iteration's messaging direction configuration option
  * </ul>
  * 
  * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
@@ -89,15 +89,15 @@ public class IncrementalSSSP implements ProgramDescription {
 		graph.removeEdge(edgeToBeRemoved);
 
 		// configure the iteration
-		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
 		if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
 
 			parameters.setDirection(EdgeDirection.IN);
 			parameters.setOptDegrees(true);
 
-			// run the vertex centric iteration to propagate info
-			Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
+			// run the scatter-gather iteration to propagate info
+			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
 					new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
 
 			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
index 40e304c..ba84e80 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -32,7 +32,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
- * This example shows how to use Gelly's vertex-centric iterations.
+ * This example shows how to use Gelly's scatter-gather iterations.
  * 
  * It is an implementation of the Single-Source-Shortest-Paths algorithm.
  * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}. 
@@ -60,8 +60,8 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 
 		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
-		// Execute the vertex-centric iteration
-		Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+		// Execute the scatter-gather iteration
+		Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
 				new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
 
 		// Extract the vertices as the result
@@ -196,6 +196,6 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 
 	@Override
 	public String getDescription() {
-		return "Vertex-centric Single Source Shortest Paths";
+		return "Scatter-gather Single Source Shortest Paths";
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index a670a72..4d68661 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -78,7 +78,7 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr
 		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
 				Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
 
-		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
+		return graphWithScoredVertices.runScatterGatherIteration(new VertexLabelUpdater<K>(delta),
 				new LabelMessenger<K>(), maxIterations)
 				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index a190cc5..866f334 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -29,7 +29,7 @@ import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 /**
- * A vertex-centric implementation of the Weakly Connected Components algorithm.
+ * A scatter-gather implementation of the Weakly Connected Components algorithm.
  *
  * This implementation assumes that the vertex values of the input Graph are initialized with Long component IDs.
  * The vertices propagate their current component ID in iterations.
@@ -66,8 +66,8 @@ public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, D
 		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
 				.getUndirected();
 
-		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runVertexCentricIteration(
+		// initialize vertex values and run the Scatter-Gather Iteration
+		return undirectedGraph.runScatterGatherIteration(
 				new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
 				.getVertices();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 04ec60a..fef6808 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -77,7 +77,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 		// iteratively adopt the most frequent label among the neighbors of each vertex
 		return input
 			.mapEdges(new NullValueEdgeMapper<K, EV>())
-			.runVertexCentricIteration(
+			.runScatterGatherIteration(
 				new UpdateVertexLabel<K, VV>(), new SendNewLabelToNeighbors<K, VV>(valueType), maxIterations)
 			.getVertices();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 935058d..9890a7c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
- * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
  * The user can define the damping factor and the maximum number of iterations.
  * If the number of vertices of the input graph is known, it should be provided as a parameter
  * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
@@ -88,7 +88,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 		Graph<K, Double, Double> networkWithWeights = network
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
 
-		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+		return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
 				new RankMessenger<K>(numberOfVertices), maxIterations)
 				.getVertices();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index baf7fe1..0c5080d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -29,7 +29,7 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
- * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
+ * This is an implementation of the Single-Source-Shortest Paths algorithm, using a scatter-gather iteration.
  */
 @SuppressWarnings("serial")
 public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
@@ -52,7 +52,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
 		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				.runScatterGatherIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
 				maxIterations).getVertices();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index f29fc9d..d25c294 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
 /**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
+ * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
  * 
  * @param <K> The type of the vertex key (the vertex identifier).
  * @param <VV> The type of the vertex value (the state of the vertex).
@@ -66,7 +66,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 
 	// --------------------------------------------------------------------------------------------
 	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
-	//  the vertex centric iteration.
+	//  the scatter gather iteration.
 	// --------------------------------------------------------------------------------------------
 
 	private EdgeDirection direction;
@@ -233,7 +233,7 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 	/**
 	 * Gets the broadcast data set registered under the given name. Broadcast data sets
 	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
 	 * 
 	 * @param name The name under which the broadcast set is registered.
 	 * @return The broadcast data set.

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
new file mode 100644
index 0000000..3a3de64
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.IterationConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A ScatterGatherConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
+ *
+ * The VertexCentricConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
+ * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
+ * ScatterGatherConfiguration)}.
+ */
+public class ScatterGatherConfiguration extends IterationConfiguration {
+
+	/** the broadcast variables for the update function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the messaging function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** flag that defines whether the degrees option is set **/
+	private boolean optDegrees = false;
+
+	/** the direction in which the messages should be sent **/
+	private EdgeDirection direction = EdgeDirection.OUT;
+
+	public ScatterGatherConfiguration() {}
+
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 *
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 *
+	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Get the broadcast variables of the VertexUpdateFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+		return this.bcVarsUpdate;
+	}
+
+	/**
+	 * Get the broadcast variables of the MessagingFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+		return this.bcVarsMessaging;
+	}
+
+	/**
+	 * Gets whether the degrees option is set.
+	 * By default, the degrees option is not set.
+	 *
+	 * @return True, if the degree option is set, false otherwise.
+	 */
+	public boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	/**
+	 * Sets the degree option.
+	 * By default, the degrees option is not set.
+	 *
+	 * @param optDegrees True, to set this option, false otherwise.
+	 */
+	public void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
+	/**
+	 * Gets the direction in which messages are sent in the MessagingFunction.
+	 * By default the messaging direction is OUT.
+	 *
+	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	/**
+	 * Sets the direction in which messages are sent in the MessagingFunction.
+	 * By default the messaging direction is OUT.
+	 *
+	 * @param direction - IN, OUT or ALL
+	 */
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
new file mode 100644
index 0000000..8023eea
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a scatter-gather perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel</i> computation.
+ * <p>
+ * Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The 
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ *   considered updated.</li>
+ *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ *
+ * Scatter-Gather graph iterations are are run by calling
+ * {@link Graph#runScatterGatherIteration(VertexUpdateFunction, MessagingFunction, int)}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public class ScatterGatherIteration<K, VV, Message, EV> 
+	implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
+{
+	private final VertexUpdateFunction<K, VV, Message> updateFunction;
+
+	private final MessagingFunction<K, VV, Message, EV> messagingFunction;
+	
+	private final DataSet<Edge<K, EV>> edgesWithValue;
+	
+	private final int maximumNumberOfIterations;
+	
+	private final TypeInformation<Message> messageType;
+	
+	private DataSet<Vertex<K, VV>> initialVertices;
+
+	private ScatterGatherConfiguration configuration;
+
+	// ----------------------------------------------------------------------------------
+	
+	private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> uf,
+			MessagingFunction<K, VV, Message, EV> mf,
+			DataSet<Edge<K, EV>> edgesWithValue, 
+			int maximumNumberOfIterations)
+	{
+		Preconditions.checkNotNull(uf);
+		Preconditions.checkNotNull(mf);
+		Preconditions.checkNotNull(edgesWithValue);
+		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithValue = edgesWithValue;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
+		return TypeExtractor.createTypeInfo(mf, MessagingFunction.class, mf.getClass(), 2);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 * 
+	 * @param inputData The input data set, which in the case of this operator represents the set of
+	 *                  vertices with their initial state.
+	 * 
+	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+	 */
+	@Override
+	public void setInput(DataSet<Vertex<K, VV>> inputData) {
+		this.initialVertices = inputData;
+	}
+	
+	/**
+	 * Creates the operator that represents this scatter-gather graph computation.
+	 * 
+	 * @return The operator that represents this scatter-gather graph computation.
+	 */
+	@Override
+	public DataSet<Vertex<K, VV>> createResult() {
+		if (this.initialVertices == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+
+		// prepare some type information
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+
+		// create a graph
+		Graph<K, VV, EV> graph =
+				Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
+
+		// check whether the numVertices option is set and, if so, compute the total number of vertices
+		// and set it within the messaging and update functions
+
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			try {
+				long numberOfVertices = graph.numberOfVertices();
+				messagingFunction.setNumberOfVertices(numberOfVertices);
+				updateFunction.setNumberOfVertices(numberOfVertices);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		if(this.configuration != null) {
+			messagingFunction.setDirection(this.configuration.getDirection());
+		} else {
+			messagingFunction.setDirection(EdgeDirection.OUT);
+		}
+
+		// retrieve the direction in which the updates are made and in which the messages are sent
+		EdgeDirection messagingDirection = messagingFunction.getDirection();
+
+		// check whether the degrees option is set and, if so, compute the in and the out degrees and
+		// add them to the vertex value
+		if(this.configuration != null && this.configuration.isOptDegrees()) {
+			return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
+		} else {
+			return createResultSimpleVertex(messagingDirection, messageTypeInfo);
+		}
+	}
+
+	/**
+	 * Creates a new scatter-gather iteration operator for graphs where the edges are associated with a value (such as
+	 * a weight or distance).
+	 * 
+	 * @param edgesWithValue The data set containing edges.
+	 * @param uf The function that updates the state of the vertices from the incoming messages.
+	 * @param mf The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <K> The type of the vertex key (the vertex identifier).
+	 * @param <VV> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * @param <EV> The type of the values that are associated with the edges.
+	 * 
+	 * @return An in stance of the scatter-gather graph computation operator.
+	 */
+	public static final <K, VV, Message, EV>
+			ScatterGatherIteration<K, VV, Message, EV> withEdges(
+					DataSet<Edge<K, EV>> edgesWithValue,
+					VertexUpdateFunction<K, VV, Message> uf,
+					MessagingFunction<K, VV, Message, EV> mf,
+					int maximumNumberOfIterations)
+	{
+		return new ScatterGatherIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+	}
+
+	/**
+	 * Configures this scatter-gather iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(ScatterGatherConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this scatter-gather iteration
+	 */
+	public ScatterGatherConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+		Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+		implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
+
+		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+		
+		private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
+		
+		
+		private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, VVWithDegrees>> resultType)
+		{
+			this.vertexUpdateFunction = vertexUpdateFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
+
+		private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
+			super(vertexUpdateFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Tuple2<K, Message>> messages,
+							Iterable<Vertex<K, VV>> vertex,
+							Collector<Vertex<K, VV>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
+
+			if (vertexIter.hasNext()) {
+				Vertex<K, VV> vertexState = vertexIter.next();
+
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+
+				vertexUpdateFunction.setOutput(vertexState, out);
+				vertexUpdateFunction.updateVertex(vertexState, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<K, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
+
+		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
+			super(vertexUpdateFunction, resultType);
+		}
+		
+		@Override
+		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
+							Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+		
+			if (vertexIter.hasNext()) {
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+		
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+
+				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
+				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
+				vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<K, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+	}
+
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+	 */
+	private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+		extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+		implements ResultTypeQueryable<Tuple2<K, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		final MessagingFunction<K, VV, Message, EV> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<K, Message>> resultType;
+	
+	
+		private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext());
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+		
+		@Override
+		public TypeInformation<Tuple2<K, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
+
+		private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
+			TypeInformation<Tuple2<K, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges,
+							Iterable<Vertex<K, VV>> state,
+							Collector<Tuple2<K, Message>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+		
+			if (stateIter.hasNext()) {
+				Vertex<K, VV> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId());
+				messagingFunction.sendMessages(newVertexState);
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+
+		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
+
+		private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+				Collector<Tuple2<K, Message>> out) throws Exception {
+
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+		
+			if (stateIter.hasNext()) {
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+
+				nextVertex.setField(vertexWithDegrees.f0, 0);
+				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
+				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+				messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
+				messagingFunction.sendMessages(nextVertex);
+			}
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  UTIL methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+	 * degrees).
+	 * It afterwards configures the function with a custom name and broadcast variables.
+	 *
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
+	 */
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+		if(this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
+	}
+
+	/**
+	 * Method that builds the messaging function using a coGroup operator for a vertex
+	 * containing degree information.
+	 * It afterwards configures the function with a custom name and broadcast variables.
+	 *
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
+	 */
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
+	}
+
+	/**
+	 * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+	 *
+	 * @param iteration
+	 */
+
+	private void setUpIteration(DeltaIteration<?, ?> iteration) {
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")");
+		}
+	}
+
+	/**
+	 * Creates the operator that represents this scatter-gather graph computation for a simple vertex.
+	 *
+	 * @param messagingDirection
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+		DataSet<Tuple2<K, Message>> messages;
+
+		TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
+
+		final DeltaIteration<Vertex<K, VV>,	Vertex<K, VV>> iteration =
+				initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		VertexUpdateUdf<K, VV, Message> updateUdf =
+				new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates);
+	}
+
+	/**
+	 * Creates the operator that represents this scatter-gather graph computation for a vertex with in
+	 * and out degrees added to the vertex value.
+	 *
+	 * @param graph
+	 * @param messagingDirection
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	@SuppressWarnings("serial")
+	private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+		DataSet<Tuple2<K, Message>> messages;
+
+		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+
+		DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
+		DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
+
+		DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
+
+					@Override
+					public void join(Tuple2<K, Long> first, Tuple2<K, Long> second,	Collector<Tuple3<K, Long, Long>> out) {
+						out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
+					}
+				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
+
+		DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
+				.join(degrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
+					@Override
+					public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
+									Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+						out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
+								new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+					}
+				}).withForwardedFieldsFirst("f0");
+
+		// add type info
+		TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+
+		final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>,	Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+				verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates).map(
+				new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+
+					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+					}
+				});
+	}
+
+	private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
+
+		// configure coGroup update function with name and broadcast variables
+		updates = updates.name("Vertex State Updates");
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+				updates = updates.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		// let the operator know that we preserve the key field
+		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8677464/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
deleted file mode 100644
index afd4ffd..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.IterationConfiguration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A VertexCentricConfiguration object can be used to set the iteration name and
- * degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
- *
- * The VertexCentricConfiguration object is passed as an argument to
- * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
- * VertexCentricConfiguration)}.
- */
-public class VertexCentricConfiguration extends IterationConfiguration {
-
-	/** the broadcast variables for the update function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
-
-	/** the broadcast variables for the messaging function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
-
-	/** flag that defines whether the degrees option is set **/
-	private boolean optDegrees = false;
-
-	/** the direction in which the messages should be sent **/
-	private EdgeDirection direction = EdgeDirection.OUT;
-
-	public VertexCentricConfiguration() {}
-
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 *
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 *
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Get the broadcast variables of the VertexUpdateFunction.
-	 *
-	 * @return a List of Tuple2, where the first field is the broadcast variable name
-	 * and the second field is the broadcast DataSet.
-	 */
-	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
-		return this.bcVarsUpdate;
-	}
-
-	/**
-	 * Get the broadcast variables of the MessagingFunction.
-	 *
-	 * @return a List of Tuple2, where the first field is the broadcast variable name
-	 * and the second field is the broadcast DataSet.
-	 */
-	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
-		return this.bcVarsMessaging;
-	}
-
-	/**
-	 * Gets whether the degrees option is set.
-	 * By default, the degrees option is not set.
-	 *
-	 * @return True, if the degree option is set, false otherwise.
-	 */
-	public boolean isOptDegrees() {
-		return optDegrees;
-	}
-
-	/**
-	 * Sets the degree option.
-	 * By default, the degrees option is not set.
-	 *
-	 * @param optDegrees True, to set this option, false otherwise.
-	 */
-	public void setOptDegrees(boolean optDegrees) {
-		this.optDegrees = optDegrees;
-	}
-
-	/**
-	 * Gets the direction in which messages are sent in the MessagingFunction.
-	 * By default the messaging direction is OUT.
-	 *
-	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
-	 */
-	public EdgeDirection getDirection() {
-		return direction;
-	}
-
-	/**
-	 * Sets the direction in which messages are sent in the MessagingFunction.
-	 * By default the messaging direction is OUT.
-	 *
-	 * @param direction - IN, OUT or ALL
-	 */
-	public void setDirection(EdgeDirection direction) {
-		this.direction = direction;
-	}
-
-}


Mime
View raw message