spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-5790][GraphX]: VertexRDD's won't zip properly for `diff` capability (added tests)
Date Sat, 14 Mar 2015 17:38:18 GMT
Repository: spark
Updated Branches:
  refs/heads/master 127268bc3 -> c49d15662


[SPARK-5790][GraphX]: VertexRDD's won't zip properly for `diff` capability (added tests)

Added tests that maropu [created](https://github.com/maropu/spark/blob/1f64794b2ce33e64f340e383d4e8a60639a7eb4b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala)
for vertices with differing partition counts. Wanted to make sure his work got captured /merged
as its not in the master branch and I don't believe there's a PR out already for it.

Author: Brennon York <brennon.york@capitalone.com>

Closes #5023 from brennonyork/SPARK-5790 and squashes the following commits:

83bbd29 [Brennon York] added maropu's tests for vertices with differing partition counts


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c49d1566
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c49d1566
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c49d1566

Branch: refs/heads/master
Commit: c49d156624624a719c0d1262a58933ea3e346963
Parents: 127268b
Author: Brennon York <brennon.york@capitalone.com>
Authored: Sat Mar 14 17:38:12 2015 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sat Mar 14 17:38:12 2015 +0000

----------------------------------------------------------------------
 .../apache/spark/graphx/VertexRDDSuite.scala    | 38 +++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c49d1566/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 131959c..97533dd 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{HashPartitioner, SparkContext}
 import org.apache.spark.storage.StorageLevel
 
 class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +58,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("diff vertices with the non-equal number of partitions") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
+      val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
+      assert(vertexA.partitions.size != vertexB.partitions.size)
+      val vertexC = vertexA.diff(vertexB)
+      assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
+    }
+  }
+
   test("leftJoin") {
     withSpark { sc =>
       val n = 100
@@ -73,6 +83,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("leftJoin vertices with the non-equal number of partitions") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
+      val vertexB = VertexRDD(
+        vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
+      assert(vertexA.partitions.size != vertexB.partitions.size)
+      val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
+        old - newOpt.getOrElse(0)
+      }
+      assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
+    }
+  }
+
   test("innerJoin") {
     withSpark { sc =>
       val n = 100
@@ -87,6 +110,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
         (0 to n by 2).map(x => (x.toLong, 0)).toSet)    }
   }
 
+  test("innerJoin vertices with the non-equal number of partitions") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
+      val vertexB = VertexRDD(
+        vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
+      assert(vertexA.partitions.size != vertexB.partitions.size)
+      val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
+        old - newVal
+      }
+      assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
+    }
+  }
+
   test("aggregateUsingIndex") {
     withSpark { sc =>
       val n = 100


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message