spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: SPARK-1329: Create pid2vid with correct number of partitions
Date Thu, 17 Apr 2014 00:17:09 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 602b9ea65 -> b4ea3d972


SPARK-1329: Create pid2vid with correct number of partitions

Each vertex partition is co-located with a pid2vid array created in RoutingTable.scala. This
array maps edge partition IDs to the list of vertices in the current vertex partition that
are mentioned by edges in that partition. Therefore the pid2vid array should have one entry
per edge partition.

GraphX currently creates one entry per *vertex* partition, which is a bug that leads to an
ArrayIndexOutOfBoundsException when there are more edge partitions than vertex partitions.
This commit fixes the bug and adds a test for this case.

Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #368 from ankurdave/fix-pid2vid-size and squashes the following commits:

5a5c52a [Ankur Dave] SPARK-1329: Create pid2vid with correct number of partitions

(cherry picked from commit 17d323455a9c8b640f149be4a81139ed638765b5)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4ea3d97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4ea3d97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4ea3d97

Branch: refs/heads/branch-1.0
Commit: b4ea3d9725261873519589f22a93bc1f684db48d
Parents: 602b9ea
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Wed Apr 16 17:16:55 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Wed Apr 16 17:17:05 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/impl/RoutingTable.scala     |  4 ++--
 .../test/scala/org/apache/spark/graphx/GraphSuite.scala | 12 ++++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4ea3d97/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index fe44e1e..022d566 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
       vSet.iterator.map { vid => (vid, pid) }
     }
 
-    val numPartitions = vertices.partitions.size
+    val numEdgePartitions = edges.partitions.size
     vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
-      val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
+      val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
       for ((vid, pid) <- iter) {
         pid2vid(pid) += vid
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4ea3d97/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index d9ba467..32b5fe4 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("more edge partitions than vertex partitions") {
+    withSpark { sc =>
+      val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
+      val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
+      val graph = Graph(verts, edges)
+      val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
+        .collect.toSet
+      assert(triplets ===
+        Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
+    }
+  }
+
 }


Mime
View raw message