spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [06/50] git commit: Documenting Pregel API
Date Tue, 14 Jan 2014 06:59:30 GMT
Documenting Pregel API


Branch: refs/heads/master
Commit: c787ff5640ad9d6f6dc3b744d73a1cb0c91eb90a
Parents: 7a4bb86
Author: Joseph E. Gonzalez <>
Authored: Sun Jan 12 20:49:41 2014 -0800
Committer: Joseph E. Gonzalez <>
Committed: Sun Jan 12 20:49:52 2014 -0800

 docs/ | 199 +++++++++++++++++++++++++++++++++-
 1 file changed, 198 insertions(+), 1 deletion(-)
diff --git a/docs/ b/docs/
index 22feccb..8975941 100644
--- a/docs/
+++ b/docs/
@@ -429,12 +429,209 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
 {% endhighlight %}
-## Map Reduce Triplets (mapReduceTriplets)
+## Neighborhood Aggregation
+A key part of graph computation is aggregating information about the neighborhood of each
+For example we might want to know the number of followers each user has or the average age
of the
+the followers of each user.  Many iterative graph algorithms (e.g., PageRank, Shortest Path,
+connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
+PageRank Value, shortest path to the source, and smallest reachable vertex id).
+### Map Reduce Triplets (mapReduceTriplets)
 <a name="mrTriplets"></a>
+[Graph.mapReduceTriplets]: api/graphx/index.html#mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+These core (heavily optimized) aggregation primitive in GraphX is the
+(`mapReduceTriplets`)[Graph.mapReduceTriplets] operator:
+{% highlight scala %}
+def mapReduceTriplets[A](
+    map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+    reduce: (A, A) => A)
+  : VertexRDD[A]
+{% endhighlight %}
+The (`mapReduceTriplets`)[Graph.mapReduceTriplets] operator takes a user defined map function
+is applied to each triplet and can yield *messages* destined to either (none or both) vertices
+the triplet.  We currently only support messages destined to the source or destination vertex
of the
+triplet to enable optimized preaggregation.  The user defined `reduce` function combines
+messages destined to each vertex.  The `mapReduceTriplets` operator returns a `VertexRDD[A]`
+containing the aggregate message to each vertex.  Vertices that do not receive a message
are not
+included in the returned `VertexRDD`.
+> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs)
+> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`.
+> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
+> iterative computation and is a key part of the GraphX implementation of Pregel.
+We can use the `mapReduceTriplets` operator to collect information about adjacent vertices.
+example if we wanted to compute the average age of followers who are older that each user
we could
+do the following.
+{% highlight scala %}
+// Graph with age as the vertex property
+val graph: Graph[Double, String] = getFromSomewhereElse()
+// Compute the number of older followers and their total age
+val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
+  triplet => { // Map Function
+    if (triplet.srcAttr > triplet.dstAttr) {
+      // Send message to destination vertex containing counter and age
+      Iterator((triplet.dstId, (1, triplet.srcAttr)))
+    } else {
+      // Don't send a message for this triplet
+      Iterator.empty
+    }
+  },
+  // Add counter and age
+  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
+// Divide total age by number of older followers to get average age of older followers
+val avgAgeOlderFollowers: VertexRDD[Double] =
+  olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
+{% endhighlight %}
+> Note that the `mapReduceTriplets` operation performs optimally when the messages (and
their sums)
+> are constant sized (e.g., floats and addition instead of lists and concatenation). 
+> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each
+Because it is often necessary to aggregate information about neighboring vertices we also
provide an
+alternative interface defined in [`GraphOps`][GraphOps]:
+{% highlight scala %}
+def aggregateNeighbors[A](
+    map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
+    reduce: (A, A) => A,
+    edgeDir: EdgeDirection)
+  : VertexRDD[A]
+{% endhighlight %}
+The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but
+the user to define the logic in a more vertex centric manner.  Here the `map` function is
+the vertex to which the message is sent as well as one of the edges and returns the optional
+value.  The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All`
+adjacent to each vertex.
+### Computing Degree Information
+A common aggregation task is computing the degree of each vertex: the number of edges adjacent
+each vertex.  In the context of directed graphs it often necessary to know the in-degree,
+degree, and the total degree of each vertex.  The  [`GraphOps`][GraphOps] class contains
+collection of operators to compute the degrees of each vertex.  For example in the following
+compute the max in, out, and total degrees:
+{% highlight scala %}
+// Define a reduce operation to compute the highest degree vertex
+def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
+  if (a._2 > b._2) a else b
+// Compute the max degrees
+val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(maxReduce)
+val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
+val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(maxReduce)
+{% endhighlight %}
+### Collecting Neighbors
+In some cases it may be easier to express computation by collecting neighboring vertices
and their
+attributes at each vertex. This can be easily accomplished using the `collectNeighborIds`
and the
+`collectNeighbors` operators.
+{% highlight scala %}
+def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
+def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+{% endhighlight %}
+> Note that these operators can be quite costly as they duplicate information and require
+> substantial communication.  If possible try expressing the same computation using the
+> `mapReduceTriplets` operator directly.
 # Pregel API
 <a name="pregel"></a>
+Graphs are inherently recursive data-structures as properties of a vertices depend on properties
+their neighbors which intern depend on properties of the neighbors of their neighbors.  As
+consequence many important graph algorithms iteratively recompute the properties of each
+until a fixed-point condition is reached.  A range of graph-parallel abstractions have been
+to express these iterative algorithms.  GraphX exposes a Pregel operator which is a fusion
+the widely used Pregel and GraphLab abstractions.
+At a high-level the GraphX variant of the Pregel abstraction is a bulk-synchronous parallel
+messaging abstraction constrained to the topology of the graph.  The Pregel operator executes
in a
+series of super-steps in which vertices receive the sum of their inbound messages from the
+super-step, compute a new property value, and then send messages to neighboring vertices
in the next
+super-step.  Vertices that do not receive a message are skipped within a super-step.  The
+operators terminates iteration and returns the final graph when there are no messages remaining.
+> Note, unlike more standard Pregel implementations, vertices in GraphX can only send
messages to
+> neighboring vertices and the message construction is done in parallel using a user defined
+> messaging function.  These constraints allow additional optimization within GraphX.
+The following is type signature of the Pregel operator as well as a *sketch* of its implementation
+(note calls to graph.cache have been removed):
+{% highlight scala %}
+def pregel[A]
+    (initialMsg: A,
+     maxIter: Int = Int.MaxValue,
+     activeDir: EdgeDirection = EdgeDirection.Out)
+    (vprog: (VertexID, VD, A) => VD,
+     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+     mergeMsg: (A, A) => A)
+  : Graph[VD, ED] = {
+  // Receive the initial message at each vertex
+  var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+  // compute the messages
+  var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
+  var activeMessages = messages.count()
+  // Loop until no messages remain or maxIterations is achieved
+  var i = 0
+  while (activeMessages > 0 && i < maxIterations) {
+    // Receive the messages: -----------------------------------------------------------------------
+    // Run the vertex program on all vertices that receive messages
+    val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+    // Merge the new vertex values back into the graph
+    g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
+    // Send Messages: ------------------------------------------------------------------------------
+    // Vertices that didn't receive a message above don't appear in newVerts and therefore
+    // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
+    // on edges in the activeDir of vertices in newVerts
+    messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
+    activeMessages = messages.count()
+    i += 1
+  }
+  g
+{% endhighlight %}
+Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`).  The first
+argument list contains configuration parameters including the initial message, the maximum
number of
+iterations, and the edge direction in which to send messages (by default along out edges).
+second argument list contains the user defined functions for receiving messages (the vertex
+`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
+We can use the Pregel operator to express computation such single source shortest path in
+following example.
+{% highlight scala %}
+val graph: Graph[String, Double] // A graph with edge attributes containing distances
+val sourceId: VertexId = 42 // The ultimate source
+// Initialize the graph such that all vertices except the root have distance infinity.
+val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity)
+val sssp = initialGraph.pregel(Double.PositiveInfinity)(
+  (id, dist, newDist) => math.min(dist, newDist) // Vertex Program
+  triplet => {  // Send Message
+    if(triplet.srcAttr + triplet.attr < triplet.dstAttr) {
+      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
+    } else {
+      Iterator.empty
+    }
+  },
+  (a,b) => math.min(a,b) // Merge Message
+  )
+{% endhighlight %}
 # Graph Builders
 <a name="graph_builders"></a>

View raw message