spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: [SPARK-2270] Kryo cannot serialize results returned by asJavaIterable
Date Wed, 25 Jun 2014 19:43:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9aa603296 -> 7ff2c754f


[SPARK-2270] Kryo cannot serialize results returned by asJavaIterable

and thus groupBy/cogroup are broken in Java APIs when Kryo is used).

@pwendell this should be merged into 1.0.1.

Thanks @sorenmacbeth for reporting this & helping out with the fix.

Author: Reynold Xin <rxin@apache.org>

Closes #1206 from rxin/kryo-iterable-2270 and squashes the following commits:

09da0aa [Reynold Xin] Updated the comment.
009bf64 [Reynold Xin] [SPARK-2270] Kryo cannot serialize results returned by asJavaIterable
(and thus groupBy/cogroup are broken in Java APIs when Kryo is used).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ff2c754
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ff2c754
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ff2c754

Branch: refs/heads/master
Commit: 7ff2c754f340ba4c4077b0ff6285876eb7871c7b
Parents: 9aa6032
Author: Reynold Xin <rxin@apache.org>
Authored: Wed Jun 25 12:43:22 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Wed Jun 25 12:43:22 2014 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 50 ++++++++++++++++++++
 .../spark/serializer/KryoSerializerSuite.scala  | 15 ++++++
 2 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ff2c754/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 5286f7b..82b62aa 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -64,6 +64,9 @@ class KryoSerializer(conf: SparkConf)
       kryo.register(cls)
     }
 
+    // For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
+    kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
+
     // Allow sending SerializableWritable
     kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
     kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
@@ -183,3 +186,50 @@ private[serializer] object KryoSerializer {
     classOf[Array[Byte]]
   )
 }
+
+/**
+ * A Kryo serializer for serializing results returned by asJavaIterable.
+ *
+ * The underlying object is scala.collection.convert.Wrappers$IterableWrapper.
+ * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work.
+ */
+private class JavaIterableWrapperSerializer
+  extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] {
+
+  import JavaIterableWrapperSerializer._
+
+  override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = {
+    // If the object is the wrapper, simply serialize the underlying Scala Iterable object.
+    // Otherwise, serialize the object itself.
+    if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) {
+      kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj))
+    } else {
+      kryo.writeClassAndObject(out, obj)
+    }
+  }
+
+  override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
+    : java.lang.Iterable[_] = {
+    kryo.readClassAndObject(in) match {
+      case scalaIterable: Iterable[_] =>
+        scala.collection.JavaConversions.asJavaIterable(scalaIterable)
+      case javaIterable: java.lang.Iterable[_] =>
+        javaIterable
+    }
+  }
+}
+
+private object JavaIterableWrapperSerializer extends Logging {
+  // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
+  val wrapperClass =
+    scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
+
+  // Get the underlying method so we can use it to get the Scala collection for serialization.
+  private val underlyingMethodOpt = {
+    try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
+      case e: Exception =>
+        logError("Failed to find the underlying field in " + wrapperClass, e)
+        None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff2c754/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index cdd6b3d..79280d1 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -128,6 +128,21 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
     check(1.0 until 1000000.0 by 2.0)
   }
 
+  test("asJavaIterable") {
+    // Serialize a collection wrapped by asJavaIterable
+    val ser = new KryoSerializer(conf).newInstance()
+    val a = ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345)))
+    val b = ser.deserialize[java.lang.Iterable[Int]](a)
+    assert(b.iterator().next() === 12345)
+
+    // Serialize a normal Java collection
+    val col = new java.util.ArrayList[Int]
+    col.add(54321)
+    val c = ser.serialize(col)
+    val d = ser.deserialize[java.lang.Iterable[Int]](c)
+    assert(b.iterator().next() === 12345)
+  }
+
   test("custom registrator") {
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {


Mime
View raw message