[FLINK3207] [gelly] add pregel methods to gellyscala
This closes #1575
Project: http://gitwipus.apache.org/repos/asf/flink/repo
Commit: http://gitwipus.apache.org/repos/asf/flink/commit/048cda72
Tree: http://gitwipus.apache.org/repos/asf/flink/tree/048cda72
Diff: http://gitwipus.apache.org/repos/asf/flink/diff/048cda72
Branch: refs/heads/master
Commit: 048cda72b686d200d8e8f455ee3fc6cbeb173ef0
Parents: cc1a797
Author: vasia <vasia@apache.org>
Authored: Tue Feb 2 16:43:04 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Mon Mar 21 19:10:29 2016 +0100

docs/apis/batch/libs/fig/LICENSE.txt  2 +
.../batch/libs/fig/gellygsassspresult.png  Bin 14472 > 0 bytes
docs/apis/batch/libs/fig/gellygsasssp2.png  Bin 24962 > 0 bytes
docs/apis/batch/libs/fig/gellyvcsssp2.png  Bin 24644 > 0 bytes
.../libs/fig/vertexcentric supersteps.png  Bin 0 > 80198 bytes
docs/apis/batch/libs/gelly.md  322 ++
docs/libs/fig/vertexcentric supersteps.png  Bin 80198 > 0 bytes
docs/libs/gelly_guide.md  1968 +
.../apache/flink/graph/examples/PregelSSSP.java  194 ++
.../SingleSourceShortestPathsITCase.java  2 +
.../org/apache/flink/graph/scala/Graph.scala  40 +
.../main/java/org/apache/flink/graph/Graph.java  10 +
.../apache/flink/graph/example/PregelSSSP.java  194 
.../flink/graph/pregel/ComputeFunction.java  37 +
.../flink/graph/pregel/MessageCombiner.java  4 +
.../flink/graph/pregel/MessageIterator.java  7 +
.../pregel/VertexCentricConfiguration.java  4 +
.../graph/pregel/VertexCentricIteration.java  72 +
18 files changed, 608 insertions(+), 2248 deletions()

http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/LICENSE.txt

diff git a/docs/apis/batch/libs/fig/LICENSE.txt b/docs/apis/batch/libs/fig/LICENSE.txt
index 35b8673..5d0d22b 100644
 a/docs/apis/batch/libs/fig/LICENSE.txt
+++ b/docs/apis/batch/libs/fig/LICENSE.txt
@@ 14,4 +14,4 @@ software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
\ No newline at end of file
+under the License.
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gellygsassspresult.png

diff git a/docs/apis/batch/libs/fig/gellygsassspresult.png b/docs/apis/batch/libs/fig/gellygsassspresult.png
deleted file mode 100644
index 6ae74dd..0000000
Binary files a/docs/apis/batch/libs/fig/gellygsassspresult.png and /dev/null differ
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gellygsasssp2.png

diff git a/docs/apis/batch/libs/fig/gellygsasssp2.png b/docs/apis/batch/libs/fig/gellygsasssp2.png
deleted file mode 100644
index edf19b8..0000000
Binary files a/docs/apis/batch/libs/fig/gellygsasssp2.png and /dev/null differ
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/gellyvcsssp2.png

diff git a/docs/apis/batch/libs/fig/gellyvcsssp2.png b/docs/apis/batch/libs/fig/gellyvcsssp2.png
deleted file mode 100644
index 67976b3..0000000
Binary files a/docs/apis/batch/libs/fig/gellyvcsssp2.png and /dev/null differ
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/fig/vertexcentric
supersteps.png

diff git a/docs/apis/batch/libs/fig/vertexcentric supersteps.png b/docs/apis/batch/libs/fig/vertexcentric
supersteps.png
new file mode 100644
index 0000000..6498a25
Binary files /dev/null and b/docs/apis/batch/libs/fig/vertexcentric supersteps.png differ
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/apis/batch/libs/gelly.md

diff git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index b22e362..d803be2 100644
 a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ 790,7 +790,256 @@ When the aggregation computation does not require access to the vertex
value (fo
Iterative Graph Processing

Gelly exploits Flink's efficient iteration operators to support largescale iterative graph
processing. Currently, we provide implementations of the popular scattergather iterative
model and a variation of GatherSumApply. In the following sections, we describe these models
and show how you can use them in Gelly.
+Gelly exploits Flink's efficient iteration operators to support largescale iterative graph
processing. Currently, we provide implementations of the vertexcentric, scattergather, and
gathersumapply models. In the following sections, we describe these abstractions and show
how you can use them in Gelly.
+
+### VertexCentric Iterations
+The vertexcentric model, also known as "think like a vertex" or "Pregel", expresses computation
from the perspective of a vertex in the graph.
+The computation proceeds in synchronized iteration steps, called supersteps. In each superstep,
each vertex executes one userdefined function.
+Vertices communicate with other vertices through messages. A vertex can send a message to
any other vertex in the graph, as long as it knows its unique ID.
+
+The computational model is shown in the figure below. The dotted boxes correspond to parallelization
units.
+In each superstep, all active vertices execute the
+same userdefined computation in parallel. Supersteps are executed synchronously, so that
messages sent during one superstep are guaranteed to be delivered in the beginning of the
next superstep.
+
+<p class="textcenter">
+ <img alt="VertexCentric Computational Model" width="70%" src="fig/vertexcentric
supersteps.png"/>
+</p>
+
+To use vertexcentric iterations in Gelly, the user only needs to define the vertex compute
function, `ComputeFunction`.
+This function and the maximum number of iterations to run are given as parameters to Gelly's
`runVertexCentricIteration`. This method will execute the vertexcentric iteration on the
input Graph and return a new Graph, with updated vertex values. An optional message combiner,
`MessageCombiner`, can be defined to reduce communication costs.
+
+Let us consider computing SingleSourceShortestPaths with vertexcentric iterations. Initially,
each vertex has a value of infinite distance, except from the source vertex, which has a value
of zero. During the first superstep, the source propagates distances to its neighbors. During
the following supersteps, each vertex checks its received messages and chooses the minimum
distance among them. If this distance is smaller than its current value, it updates its state
and produces messages for its neighbors. If a vertex does not change its value during a superstep,
then it does not produce any messages for its neighbors for the next superstep. The algorithm
converges when there are no value updates or the maximum number of supersteps has been reached.
In this algorithm, a message combiner can be used to reduce the number of messages sent to
a target vertex.
+
+
+<div class="codetabs" markdown="1">
+<div datalang="java" markdown="1">
+{% highlight java %}
+// read the input graph
+Graph<Long, Double, Double> graph = ...
+
+// define the maximum number of iterations
+int maxIterations = 10;
+
+// Execute the vertexcentric iteration
+Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+ new SSSPComputeFunction(), new SSSPCombiner(), maxIterations);
+
+// Extract the vertices as the result
+DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+
+//    UDFs    //
+
+public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double,
Double> {
+
+public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages)
{
+
+ double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
+
+ for (Double msg : messages) {
+ minDistance = Math.min(minDistance, msg);
+ }
+
+ if (minDistance < vertex.getValue()) {
+ setNewVertexValue(minDistance);
+ for (Edge<Long, Double> e: getEdges()) {
+ sendMessageTo(e.getTarget(), minDistance + e.getValue());
+ }
+ }
+}
+
+// message combiner
+public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
+
+ public void combineMessages(MessageIterator<Double> messages) {
+
+ double minMessage = Double.POSITIVE_INFINITY;
+ for (Double msg: messages) {
+ minMessage = Math.min(minMessage, msg);
+ }
+ sendCombinedMessage(minMessage);
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div datalang="scala" markdown="1">
+{% highlight scala %}
+// read the input graph
+val graph: Graph[Long, Double, Double] = ...
+
+// define the maximum number of iterations
+val maxIterations = 10
+
+// Execute the vertexcentric iteration
+val result = graph.runVertexCentricIteration(new SSSPComputeFunction, new SSSPCombiner, maxIterations)
+
+// Extract the vertices as the result
+val singleSourceShortestPaths = result.getVertices
+
+
+//    UDFs    //
+
+final class SSSPComputeFunction extends ComputeFunction[Long, Double, Double, Double] {
+
+ override def compute(vertex: Vertex[Long, Double], messages: MessageIterator[Double])
= {
+
+ var minDistance = if (vertex.getId.equals(srcId)) 0 else Double.MaxValue
+
+ while (messages.hasNext) {
+ val msg = messages.next
+ if (msg < minDistance) {
+ minDistance = msg
+ }
+ }
+
+ if (vertex.getValue > minDistance) {
+ setNewVertexValue(minDistance)
+ for (edge: Edge[Long, Double] < getEdges) {
+ sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+ }
+ }
+}
+
+// message combiner
+final class SSSPCombiner extends MessageCombiner[Long, Double] {
+
+ override def combineMessages(messages: MessageIterator[Double]) {
+
+ var minDistance = Double.MaxValue
+
+ while (messages.hasNext) {
+ val msg = inMessages.next
+ if (msg < minDistance) {
+ minDistance = msg
+ }
+ }
+ sendCombinedMessage(minMessage)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+### Configuring a VertexCentric Iteration
+A vertexcentric iteration can be configured using a `VertexCentricConfiguration` object.
+Currently, the following parameters can be specified:
+
+* <strong>Name</strong>: The name for the vertexcentric iteration. The name
is displayed in logs and messages
+and can be specified using the `setName()` method.
+
+* <strong>Parallelism</strong>: The parallelism for the iteration. It can be
set using the `setParallelism()` method.
+
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution
set is kept in managed memory (Flink's internal way of keeping objects in serialized form)
or as a simple object map. By default, the solution set runs in managed memory. This property
can be set using the `setSolutionSetUnmanagedMemory()` method.
+
+* <strong>Aggregators</strong>: Iteration aggregators can be registered using
the `registerAggregator()` method. An iteration aggregator combines
+all aggregates globally once per superstep and makes them available in the next superstep.
Registered aggregators can be accessed inside the userdefined `ComputeFunction`.
+
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/batch/index.html#broadcastvariables)
to the `ComputeFunction`, using the `addBroadcastSet()` method.
+
+<div class="codetabs" markdown="1">
+<div datalang="java" markdown="1">
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+// set the iteration name
+parameters.setName("Gelly Iteration");
+
+// set the parallelism
+parameters.setParallelism(16);
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator());
+
+// run the vertexcentric iteration, also passing the configuration parameters
+Graph<Long, Long, Double> result =
+ graph.runVertexCentricIteration(
+ new Compute(), null, maxIterations, parameters);
+
+// userdefined function
+public static final class Compute extends ComputeFunction {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ public void preSuperstep() {
+
+ // retrieve the Aggregator
+ aggregator = getIterationAggregator("sumAggregator");
+ }
+
+
+ public void compute(Vertex<Long, Long> vertex, MessageIterator inMessages) {
+
+ //do some computation
+ Long partialValue = ...
+
+ // aggregate the partial value
+ aggregator.aggregate(partialValue);
+
+ // update the vertex value
+ setNewVertexValue(...);
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div datalang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, Long, Double] = ...
+
+val parameters = new VertexCentricConfiguration
+
+// set the iteration name
+parameters.setName("Gelly Iteration")
+
+// set the parallelism
+parameters.setParallelism(16)
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator)
+
+// run the vertexcentric iteration, also passing the configuration parameters
+val result = graph.runVertexCentricIteration(new Compute, new Combiner, maxIterations, parameters)
+
+// userdefined function
+final class Compute extends ComputeFunction {
+
+ var aggregator = new LongSumAggregator
+
+ override def preSuperstep {
+
+ // retrieve the Aggregator
+ aggregator = getIterationAggregator("sumAggregator")
+ }
+
+
+ override def compute(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
+
+ //do some computation
+ val partialValue = ...
+
+ // aggregate the partial value
+ aggregator.aggregate(partialValue)
+
+ // update the vertex value
+ setNewVertexValue(...)
+ }
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
### ScatterGather Iterations
The scattergather model, also known as "signal/collect" model, expresses computation from
the perspective of a vertex in the graph. The computation proceeds in synchronized iteration
steps, called supersteps. In each superstep, a vertex produces messages for other vertices
and updates its value based on the messages it receives. To use scattergather iterations
in Gelly, the user only needs to define how a vertex behaves in each superstep:
@@ 810,10 +1059,6 @@ Let us consider computing SingleSourceShortestPaths with scattergather
itera
<img alt="Scattergather SSSP superstep 1" width="70%" src="fig/gellyvcsssp1.png"/>
</p>
<p class="textcenter">
 <img alt="Scattergather SSSP superstep 2" width="70%" src="fig/gellyvcsssp2.png"/>
</p>

<div class="codetabs" markdown="1">
<div datalang="java" markdown="1">
{% highlight java %}
@@ 1184,16 +1429,7 @@ Let us consider computing SingleSourceShortestPaths with GSA on
the following
<img alt="GSA SSSP superstep 1" width="70%" src="fig/gellygsasssp1.png"/>
</p>
<p class="textcenter">
 <img alt="GSA SSSP superstep 2" width="70%" src="fig/gellygsasssp2.png"/>
</p>

Notice that, if a vertex does not change its value during a superstep, it will not calculate
candidate distance during the next superstep. The algorithm converges when no vertex changes
value.
The resulting graph after the algorithm converges is shown below.

<p class="textcenter">
 <img alt="GSA SSSP result" width="70%" src="fig/gellygsassspresult.png"/>
</p>
To implement this example in Gelly GSA, the user only needs to call the `runGatherSumApplyIteration`
method on the input graph and provide the `GatherFunction`, `SumFunction` and `ApplyFunction`
UDFs. Iteration synchronization, grouping, value updates and convergence are handled by the
system:
@@ 1442,16 +1678,58 @@ val result = graph.runGatherSumApplyIteration(new Gather, new Sum,
new Apply, ma
</div>
{% top %}
### ScatterGather and GSA Comparison
As seen in the examples above, GatherSumApply iterations are quite similar to scattergather
iterations. In fact, any algorithm which can be expressed as a GSA iteration can also be written
in the scattergather model.
The messaging phase of the scattergather model is equivalent to the Gather and Sum steps
of GSA: Gather can be seen as the phase where the messages are produced and Sum as the phase
where they are routed to the target vertex. Similarly, the value update phase corresponds
to the Apply step.
+### Iteration Abstractions Comparison
+Although the three iteration abstractions in Gelly seem quite similar, understanding their
differences can lead to more performant and maintainable programs.
+Among the three, the vertexcentric model is the most general model and supports arbitrary
computation and messaging for each vertex. In the scattergather model, the logic of producing
messages is decoupled from the logic of updating vertex values. Thus, programs written using
scattergather are sometimes easier to follow and maintain.
+Separating the messaging phase from the vertex value update logic not only makes some programs
easier to follow but might also have a positive impact on performance. Scattergather implementations
typically have lower memory requirements, because concurrent access to the inbox (messages
received) and outbox (messages to send) data structures is not required. However, this characteristic
also limits expressiveness and makes some computation patterns nonintuitive. Naturally, if
an algorithm requires a vertex to concurrently access its inbox and outbox, then the expression
of this algorithm in scattergather might be problematic. Strongly Connected Components and
Approximate Maximum
+Weight Matching are examples of such graph algorithms. A direct consequence of this restriction
is that vertices cannot generate messages and update their states in the same phase. Thus,
deciding whether to propagate a message based on its content would require storing it in the
vertex value, so that the gather phase has access to it, in the following iteration step.
Similarly, if the vertex update logic includes computation over the values of the neighboring
edges, these have to be included inside a special message passed from the scatter to the gather
phase. Such workarounds often lead to higher memory requirements and nonelegant, hard to
understand algorithm implementations.
+
+Gathersumapply iterations are also quite similar to scattergather iterations. In fact,
any algorithm which can be expressed as a GSA iteration can also be written in the scattergather
model. The messaging phase of the scattergather model is equivalent to the Gather and Sum
steps of GSA: Gather can be seen as the phase where the messages are produced and Sum as the
phase where they are routed to the target vertex. Similarly, the value update phase corresponds
to the Apply step.
The main difference between the two implementations is that the Gather phase of GSA parallelizes
the computation over the edges, while the messaging phase distributes the computation over
the vertices. Using the SSSP examples above, we see that in the first superstep of the scattergather
case, vertices 1, 2 and 3 produce messages in parallel. Vertex 1 produces 3 messages, while
vertices 2 and 3 produce one message each. In the GSA case on the other hand, the computation
is parallelized over the edges: the three candidate distance values of vertex 1 are produced
in parallel. Thus, if the Gather step contains "heavy" computation, it might be a better idea
to use GSA and spread out the computation, instead of burdening a single vertex. Another case
when parallelizing over the edges might prove to be more efficient is when the input graph
is skewed (some vertices have a lot more neighbors than others).
Another difference between the two implementations is that the scattergather implementation
uses a `coGroup` operator internally, while GSA uses a `reduce`. Therefore, if the function
that combines neighbor values (messages) requires the whole group of values for the computation,
scattergather should be used. If the update function is associative and commutative, then
the GSA's reducer is expected to give a more efficient implementation, as it can make use
of a combiner.
Another thing to note is that GSA works strictly on neighborhoods, while in the scattergather
model, a vertex can send a message to any vertex, given that it knows its vertex ID, regardless
of whether it is a neighbor.
Finally, in Gelly's scattergather implementation, one can choose the messaging direction,
i.e. the direction in which updates propagate. GSA does not support this yet, so each vertex
will be updated based on the values of its inneighbors only.
+Another thing to note is that GSA works strictly on neighborhoods, while in the vertexcentric
and scattergather models, a vertex can send a message to any vertex, given that it knows
its vertex ID, regardless of whether it is a neighbor. Finally, in Gelly's scattergather
implementation, one can choose the messaging direction, i.e. the direction in which updates
propagate. GSA does not support this yet, so each vertex will be updated based on the values
of its inneighbors only.
+
+The main differences among the Gelly iteration models are shown in the table below.
+
+
+<table class="table tablebordered">
+ <thead>
+ <tr>
+ <th class="textleft" style="width: 25%">Iteration Model</th>
+ <th class="textcenter">Update Function</th>
+ <th class="textcenter">Update Logic</th>
+ <th class="textcenter">Communication Scope</th>
+ <th class="textcenter">Communication Logic</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>VertexCentric</td>
+ <td>arbitrary</td>
+ <td>arbitrary</td>
+ <td>any vertex</td>
+ <td>arbitrary</td>
+</tr>
+<tr>
+ <td>ScatterGather</td>
+ <td>arbitrary</td>
+ <td>based on received messages</td>
+ <td>any vertex</td>
+ <td>based on vertex state</td>
+</tr>
+<tr>
+ <td>GatherSumApply</td>
+ <td>associative and commutative</td>
+ <td>based on neighbors' values</td>
+ <td>neighborhood</td>
+ <td>based on vertex state</td>
+</tr>
+</tbody>
+</table>
+
Graph Validation

@@ 1581,12 +1859,12 @@ This is an implementation of the wellknown Label Propagation algorithm
describe
The algorithm is implemented using [scattergather iterations](#scattergatheriterations).
Labels are expected to be of type `Comparable` and are initialized using the vertex values
of the input `Graph`.
The algorithm iteratively refines discovered communities by propagating labels. In each iteration,
a vertex adopts
the label that is most frequent among its neighbors' labels. In case of a tie (i.e. two or
more labels appear with the
same frequency), the algorithm picks the greater label. The algorithm converges when no vertex
changes its value or
+the label that is most frequent among its neighbors' labels. In case of a tie (i.e. two or
more labels appear with the
+same frequency), the algorithm picks the greater label. The algorithm converges when no vertex
changes its value or
the maximum number of iterations has been reached. Note that different initializations might
lead to different results.
#### Usage
The algorithm takes as input a `Graph` with a `Comparable` vertex type, a `Comparable` vertex
value type and an arbitrary edge value type.
+The algorithm takes as input a `Graph` with a `Comparable` vertex type, a `Comparable` vertex
value type and an arbitrary edge value type.
It returns a `DataSet` of vertices, where the vertex value corresponds to the community in
which this vertex belongs after convergence.
The constructor takes one parameter:
http://gitwipus.apache.org/repos/asf/flink/blob/048cda72/docs/libs/fig/vertexcentric supersteps.png

diff git a/docs/libs/fig/vertexcentric supersteps.png b/docs/libs/fig/vertexcentric supersteps.png
deleted file mode 100644
index 6498a25..0000000
Binary files a/docs/libs/fig/vertexcentric supersteps.png and /dev/null differ
