spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Merge pull request #49 from mateiz/kryo-fix-2
Date Wed, 09 Oct 2013 23:56:07 GMT
Updated Branches:
  refs/heads/branch-0.8 0b6f047b5 -> dfc62e294


Merge pull request #49 from mateiz/kryo-fix-2

Fix Chill serialization of Range objects

It used to write out each element one by one, creating very large objects.

(cherry picked from commit 320418f7c8b42d4ce781b32c9ee47a9b54550b89)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dfc62e29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dfc62e29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dfc62e29

Branch: refs/heads/branch-0.8
Commit: dfc62e294dc942303c01dc17a9c2974e8fa9668e
Parents: 0b6f047
Author: Reynold Xin <rxin@apache.org>
Authored: Wed Oct 9 16:55:30 2013 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Wed Oct 9 16:55:58 2013 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 14 ++++++++++---
 .../spark/serializer/KryoSerializerSuite.scala  | 21 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dfc62e29/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 6c500ba..e936b1c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
 import com.esotericsoftware.kryo.{KryoException, Kryo}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.twitter.chill.ScalaKryoInstantiator
+import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
 
 import org.apache.spark.{SerializableWritable, Logging}
 import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
@@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with
Logging
   def newKryoOutput() = new KryoOutput(bufferSize)
 
   def newKryo(): Kryo = {
-    val instantiator = new ScalaKryoInstantiator
+    val instantiator = new EmptyScalaKryoInstantiator
     val kryo = instantiator.newKryo()
     val classLoader = Thread.currentThread.getContextClassLoader
 
@@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with
Logging
       StorageLevel.MEMORY_ONLY,
       PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
       GotBlock("1", ByteBuffer.allocate(1)),
-      GetBlock("1")
+      GetBlock("1"),
+      1 to 10,
+      1 until 10,
+      1L to 10L,
+      1L until 10L
     )
 
     for (obj <- toRegister) kryo.register(obj.getClass)
@@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with
Logging
       case _: Exception => println("Failed to register spark.kryo.registrator")
     }
 
+    // Register Chill's classes; we do this after our ranges and the user's own classes to
let
+    // our code override the generic serialziers in Chill for things like Seq
+    new AllScalaRegistrar().apply(kryo)
+
     kryo.setClassLoader(classLoader)
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have
loops

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dfc62e29/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 0164dda..c016c51 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
     check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
   }
 
+  test("ranges") {
+    val ser = (new KryoSerializer).newInstance()
+    def check[T](t: T) {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+      // Check that very long ranges don't get written one element at a time
+      assert(ser.serialize(t).limit < 100)
+    }
+    check(1 to 1000000)
+    check(1 to 1000000 by 2)
+    check(1 until 1000000)
+    check(1 until 1000000 by 2)
+    check(1L to 1000000L)
+    check(1L to 1000000L by 2L)
+    check(1L until 1000000L)
+    check(1L until 1000000L by 2L)
+    check(1.0 to 1000000.0 by 1.0)
+    check(1.0 to 1000000.0 by 2.0)
+    check(1.0 until 1000000.0 by 1.0)
+    check(1.0 until 1000000.0 by 2.0)
+  }
+
   test("custom registrator") {
     System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
 


Mime
View raw message