Documenting Pregel API
Project: http://gitwipus.apache.org/repos/asf/incubatorspark/repo
Commit: http://gitwipus.apache.org/repos/asf/incubatorspark/commit/c787ff56
Tree: http://gitwipus.apache.org/repos/asf/incubatorspark/tree/c787ff56
Diff: http://gitwipus.apache.org/repos/asf/incubatorspark/diff/c787ff56
Branch: refs/heads/master
Commit: c787ff5640ad9d6f6dc3b744d73a1cb0c91eb90a
Parents: 7a4bb86
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Authored: Sun Jan 12 20:49:41 2014 0800
Committer: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Committed: Sun Jan 12 20:49:52 2014 0800

docs/graphxprogrammingguide.md  199 +++++++++++++++++++++++++++++++++
1 file changed, 198 insertions(+), 1 deletion()

http://gitwipus.apache.org/repos/asf/incubatorspark/blob/c787ff56/docs/graphxprogrammingguide.md

diff git a/docs/graphxprogrammingguide.md b/docs/graphxprogrammingguide.md
index 22feccb..8975941 100644
 a/docs/graphxprogrammingguide.md
+++ b/docs/graphxprogrammingguide.md
@@ 429,12 +429,209 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
{% endhighlight %}
## Map Reduce Triplets (mapReduceTriplets)
+## Neighborhood Aggregation
+
+A key part of graph computation is aggregating information about the neighborhood of each
vertex.
+For example we might want to know the number of followers each user has or the average age
of the
+the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path,
and
+connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
+PageRank Value, shortest path to the source, and smallest reachable vertex id).
+
+### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a>
+[Graph.mapReduceTriplets]: api/graphx/index.html#mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+
+These core (heavily optimized) aggregation primitive in GraphX is the
+(`mapReduceTriplets`)[Graph.mapReduceTriplets] operator:
+
+{% highlight scala %}
+def mapReduceTriplets[A](
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduce: (A, A) => A)
+ : VertexRDD[A]
+{% endhighlight %}
+
+The (`mapReduceTriplets`)[Graph.mapReduceTriplets] operator takes a user defined map function
which
+is applied to each triplet and can yield *messages* destined to either (none or both) vertices
in
+the triplet. We currently only support messages destined to the source or destination vertex
of the
+triplet to enable optimized preaggregation. The user defined `reduce` function combines
the
+messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
+containing the aggregate message to each vertex. Vertices that do not receive a message
are not
+included in the returned `VertexRDD`.
+
+> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs)
which
+> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`.
Restricting
+> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
+> iterative computation and is a key part of the GraphX implementation of Pregel.
+
+We can use the `mapReduceTriplets` operator to collect information about adjacent vertices.
For
+example if we wanted to compute the average age of followers who are older that each user
we could
+do the following.
+
+{% highlight scala %}
+// Graph with age as the vertex property
+val graph: Graph[Double, String] = getFromSomewhereElse()
+// Compute the number of older followers and their total age
+val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
+ triplet => { // Map Function
+ if (triplet.srcAttr > triplet.dstAttr) {
+ // Send message to destination vertex containing counter and age
+ Iterator((triplet.dstId, (1, triplet.srcAttr)))
+ } else {
+ // Don't send a message for this triplet
+ Iterator.empty
+ }
+ },
+ // Add counter and age
+ (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+)
+// Divide total age by number of older followers to get average age of older followers
+val avgAgeOlderFollowers: VertexRDD[Double] =
+ olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
+{% endhighlight %}
+
+> Note that the `mapReduceTriplets` operation performs optimally when the messages (and
their sums)
+> are constant sized (e.g., floats and addition instead of lists and concatenation).
More
+> precisely, the result of `mapReduceTriplets` should be sublinear in the degree of each
vertex.
+
+Because it is often necessary to aggregate information about neighboring vertices we also
provide an
+alternative interface defined in [`GraphOps`][GraphOps]:
+
+{% highlight scala %}
+def aggregateNeighbors[A](
+ map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
+ reduce: (A, A) => A,
+ edgeDir: EdgeDirection)
+ : VertexRDD[A]
+{% endhighlight %}
+
+The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but
allows
+the user to define the logic in a more vertex centric manner. Here the `map` function is
provided
+the vertex to which the message is sent as well as one of the edges and returns the optional
message
+value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All`
edges
+adjacent to each vertex.
+
+### Computing Degree Information
+
+A common aggregation task is computing the degree of each vertex: the number of edges adjacent
to
+each vertex. In the context of directed graphs it often necessary to know the indegree,
out
+degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains
a
+collection of operators to compute the degrees of each vertex. For example in the following
we
+compute the max in, out, and total degrees:
+
+{% highlight scala %}
+// Define a reduce operation to compute the highest degree vertex
+def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
+ if (a._2 > b._2) a else b
+}
+// Compute the max degrees
+val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce)
+val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
+val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce)
+{% endhighlight %}
+
+
+### Collecting Neighbors
+
+In some cases it may be easier to express computation by collecting neighboring vertices
and their
+attributes at each vertex. This can be easily accomplished using the `collectNeighborIds`
and the
+`collectNeighbors` operators.
+
+{% highlight scala %}
+def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
+def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+{% endhighlight %}
+
+> Note that these operators can be quite costly as they duplicate information and require
+> substantial communication. If possible try expressing the same computation using the
+> `mapReduceTriplets` operator directly.
+
# Pregel API
<a name="pregel"></a>
+Graphs are inherently recursive datastructures as properties of a vertices depend on properties
of
+their neighbors which intern depend on properties of the neighbors of their neighbors. As
a
+consequence many important graph algorithms iteratively recompute the properties of each
vertex
+until a fixedpoint condition is reached. A range of graphparallel abstractions have been
proposed
+to express these iterative algorithms. GraphX exposes a Pregel operator which is a fusion
of
+the widely used Pregel and GraphLab abstractions.
+
+At a highlevel the GraphX variant of the Pregel abstraction is a bulksynchronous parallel
+messaging abstraction constrained to the topology of the graph. The Pregel operator executes
in a
+series of supersteps in which vertices receive the sum of their inbound messages from the
previous
+superstep, compute a new property value, and then send messages to neighboring vertices
in the next
+superstep. Vertices that do not receive a message are skipped within a superstep. The
Pregel
+operators terminates iteration and returns the final graph when there are no messages remaining.
+
+> Note, unlike more standard Pregel implementations, vertices in GraphX can only send
messages to
+> neighboring vertices and the message construction is done in parallel using a user defined
+> messaging function. These constraints allow additional optimization within GraphX.
+
+The following is type signature of the Pregel operator as well as a *sketch* of its implementation
+(note calls to graph.cache have been removed):
+
+{% highlight scala %}
+def pregel[A]
+ (initialMsg: A,
+ maxIter: Int = Int.MaxValue,
+ activeDir: EdgeDirection = EdgeDirection.Out)
+ (vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+ // Receive the initial message at each vertex
+ var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+ // compute the messages
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+ var activeMessages = messages.count()
+ // Loop until no messages remain or maxIterations is achieved
+ var i = 0
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages: 
+ // Run the vertex program on all vertices that receive messages
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Merge the new vertex values back into the graph
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
+ // Send Messages: 
+ // Vertices that didn't receive a message above don't appear in newVerts and therefore
don't
+ // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
+ // on edges in the activeDir of vertices in newVerts
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+ activeMessages = messages.count()
+ i += 1
+ }
+ g
+}
+{% endhighlight %}
+
+Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`). The first
+argument list contains configuration parameters including the initial message, the maximum
number of
+iterations, and the edge direction in which to send messages (by default along out edges).
The
+second argument list contains the user defined functions for receiving messages (the vertex
program
+`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
+
+We can use the Pregel operator to express computation such single source shortest path in
the
+following example.
+
+{% highlight scala %}
+val graph: Graph[String, Double] // A graph with edge attributes containing distances
+val sourceId: VertexId = 42 // The ultimate source
+// Initialize the graph such that all vertices except the root have distance infinity.
+val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity)
+val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+ (id, dist, newDist) => math.min(dist, newDist) // Vertex Program
+ triplet => { // Send Message
+ if(triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+ Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+ } else {
+ Iterator.empty
+ }
+ },
+ (a,b) => math.min(a,b) // Merge Message
+ )
+{% endhighlight %}
+
# Graph Builders
<a name="graph_builders"></a>
