spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-1931] Reconstruct routing tables in Graph.partitionBy
Date Mon, 26 May 2014 23:10:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master cb7fe5034 -> 56c771cb2


[SPARK-1931] Reconstruct routing tables in Graph.partitionBy

905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where, after repartitioning
the edges, it reuses the VertexRDD without updating the routing tables to reflect the new
edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties.

This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and
calling it in `partitionBy`.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #885 from ankurdave/SPARK-1931 and squashes the following commits:

3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins
9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56c771cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56c771cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56c771cb

Branch: refs/heads/master
Commit: 56c771cb2d00a5843c391ae6561536ee46e535d4
Parents: cb7fe50
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Mon May 26 16:10:22 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Mon May 26 16:10:22 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/graphx/VertexRDD.scala | 12 ++++++++++++
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 13 +++++++++----
 .../scala/org/apache/spark/graphx/GraphSuite.scala     | 10 ++++++++++
 3 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8c62897..8b910fb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
   def reverseRoutingTables(): VertexRDD[VD] =
     this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
 
+  /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
+  def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+    val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
+    val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
+      (partIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+        partIter.map(_.withRoutingTable(routingTable))
+    }
+    new VertexRDD(vertexPartitions)
+  }
+
   /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions.
*/
   private[graphx] def shipVertexAttributes(
       shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] =
{

http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 2f2d0e0..1649b24 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
         }
         val edgePartition = builder.toEdgePartition
         Iterator((pid, edgePartition))
-      }, preservesPartitioning = true))
-    GraphImpl.fromExistingRDDs(vertices, newEdges)
+      }, preservesPartitioning = true)).cache()
+    GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
@@ -277,7 +277,11 @@ object GraphImpl {
     GraphImpl(vertexRDD, edgeRDD)
   }
 
-  /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices.
*/
+  /**
+   * Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
+   * VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
+   * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
+   */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
       edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
@@ -290,7 +294,8 @@ object GraphImpl {
 
   /**
    * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type
as the
-   * vertices.
+   * vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD
by calling
+   * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
    */
   def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],

http://git-wip-us.apache.org/repos/asf/spark/blob/56c771cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 7b9bac5..abc25d0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
         Iterator((part.srcIds ++ part.dstIds).toSet)
       }.collect
       assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) >
bound))
+
+      // Forming triplets view
+      val g = Graph(
+        sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
+        sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
+      assert(g.triplets.collect.map(_.toTuple).toSet ===
+        Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
+      val gPart = g.partitionBy(EdgePartition2D)
+      assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+        Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
     }
   }
 


Mime
View raw message