spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-2455] Mark (Shippable)VertexPartition serializable
Date Sat, 12 Jul 2014 19:05:51 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 2a5514f7d -> 354ce4d30


[SPARK-2455] Mark (Shippable)VertexPartition serializable

VertexPartition and ShippableVertexPartition are contained in RDDs but are not marked Serializable,
leading to NotSerializableExceptions when using Java serialization.

The fix is simply to mark them as Serializable. This PR does that and adds a test for serializing
them using Java and Kryo serialization.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #1376 from ankurdave/SPARK-2455 and squashes the following commits:

ed4a51b [Ankur Dave] Make (Shippable)VertexPartition serializable
1fd42c5 [Ankur Dave] Add failing tests for Java serialization

(cherry picked from commit 7a0135293192aaefc6ae20b57e15a90945bd8a4e)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/354ce4d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/354ce4d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/354ce4d3

Branch: refs/heads/branch-1.0
Commit: 354ce4d30db41a5bff932c8f73ae565038e389f1
Parents: 2a5514f
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Sat Jul 12 12:05:34 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Sat Jul 12 12:05:48 2014 -0700

----------------------------------------------------------------------
 .../graphx/impl/RoutingTablePartition.scala     |  2 +-
 .../spark/graphx/impl/VertexPartitionBase.scala |  3 ++-
 .../graphx/impl/VertexPartitionBaseOps.scala    |  2 +-
 .../spark/graphx/impl/EdgePartitionSuite.scala  | 24 ++++++++++++--------
 .../graphx/impl/VertexPartitionSuite.scala      | 20 +++++++++++++++-
 5 files changed, 37 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/354ce4d3/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index d02e923..2c0dcff 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -119,7 +119,7 @@ object RoutingTablePartition {
  */
 private[graphx]
 class RoutingTablePartition(
-    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
+    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable
{
   /** The maximum number of edge partitions this `RoutingTablePartition` is built to join
with. */
   val numEdgePartitions: Int = routingTable.size
 

http://git-wip-us.apache.org/repos/asf/spark/blob/354ce4d3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
index 34939b2..5ad6390 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -60,7 +60,8 @@ private[graphx] object VertexPartitionBase {
  * `VertexPartitionBaseOpsConstructor` typeclass (for example,
  * [[VertexPartition.VertexPartitionOpsConstructor]]).
  */
-private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag]
{
+private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag]
+  extends Serializable {
 
   def index: VertexIdToIndexMap
   def values: Array[VD]

http://git-wip-us.apache.org/repos/asf/spark/blob/354ce4d3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index a4f769b..b40aa1b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -35,7 +35,7 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 private[graphx] abstract class VertexPartitionBaseOps
     [VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
     (self: Self[VD])
-    extends Logging {
+  extends Serializable with Logging {
 
   def withIndex(index: VertexIdToIndexMap): Self[VD]
   def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]

http://git-wip-us.apache.org/repos/asf/spark/blob/354ce4d3/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 28fd112..9d00f76 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -23,6 +23,7 @@ import scala.util.Random
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.serializer.KryoSerializer
 
 import org.apache.spark.graphx._
@@ -124,18 +125,21 @@ class EdgePartitionSuite extends FunSuite {
     assert(ep.numActives == Some(2))
   }
 
-  test("Kryo serialization") {
+  test("serialization") {
     val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
     val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
-    val conf = new SparkConf()
+    val javaSer = new JavaSerializer(new SparkConf())
+    val kryoSer = new KryoSerializer(new SparkConf()
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
-    val s = new KryoSerializer(conf).newInstance()
-    val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
-    assert(aSer.srcIds.toList === a.srcIds.toList)
-    assert(aSer.dstIds.toList === a.dstIds.toList)
-    assert(aSer.data.toList === a.data.toList)
-    assert(aSer.index != null)
-    assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+      .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+    for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+      val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
+      assert(aSer.srcIds.toList === a.srcIds.toList)
+      assert(aSer.dstIds.toList === a.dstIds.toList)
+      assert(aSer.data.toList === a.data.toList)
+      assert(aSer.index != null)
+      assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/354ce4d3/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index 8bf1384..f9e771a 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -17,9 +17,14 @@
 
 package org.apache.spark.graphx.impl
 
-import org.apache.spark.graphx._
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.KryoSerializer
+
+import org.apache.spark.graphx._
+
 class VertexPartitionSuite extends FunSuite {
 
   test("isDefined, filter") {
@@ -116,4 +121,17 @@ class VertexPartitionSuite extends FunSuite {
     assert(vp3.index.getPos(2) === -1)
   }
 
+  test("serialization") {
+    val verts = Set((0L, 1), (1L, 1), (2L, 1))
+    val vp = VertexPartition(verts.iterator)
+    val javaSer = new JavaSerializer(new SparkConf())
+    val kryoSer = new KryoSerializer(new SparkConf()
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+    for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+      val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp))
+      assert(vpSer.iterator.toSet === verts)
+    }
+  }
 }


Mime
View raw message