Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7996910E60 for ; Wed, 9 Oct 2013 23:56:29 +0000 (UTC) Received: (qmail 61341 invoked by uid 500); 9 Oct 2013 23:56:29 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 61319 invoked by uid 500); 9 Oct 2013 23:56:29 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 61312 invoked by uid 99); 9 Oct 2013 23:56:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Oct 2013 23:56:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 09 Oct 2013 23:56:28 +0000 Received: (qmail 60865 invoked by uid 99); 9 Oct 2013 23:56:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Oct 2013 23:56:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B6A10913915; Wed, 9 Oct 2013 23:56:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.incubator.apache.org Message-Id: <2087e5514b014221873004a9d4354f19@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Merge pull request #49 from mateiz/kryo-fix-2 Date: Wed, 9 Oct 2013 23:56:07 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/branch-0.8 0b6f047b5 -> dfc62e294 Merge pull request #49 from mateiz/kryo-fix-2 Fix Chill serialization of Range objects It used to write out each element one by one, creating very large objects. (cherry picked from commit 320418f7c8b42d4ce781b32c9ee47a9b54550b89) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dfc62e29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dfc62e29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dfc62e29 Branch: refs/heads/branch-0.8 Commit: dfc62e294dc942303c01dc17a9c2974e8fa9668e Parents: 0b6f047 Author: Reynold Xin Authored: Wed Oct 9 16:55:30 2013 -0700 Committer: Reynold Xin Committed: Wed Oct 9 16:55:58 2013 -0700 ---------------------------------------------------------------------- .../spark/serializer/KryoSerializer.scala | 14 ++++++++++--- .../spark/serializer/KryoSerializerSuite.scala | 21 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dfc62e29/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 6c500ba..e936b1c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.{KryoException, Kryo} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import com.twitter.chill.ScalaKryoInstantiator +import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel} @@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) def newKryo(): Kryo = { - val instantiator = new ScalaKryoInstantiator + val instantiator = new EmptyScalaKryoInstantiator val kryo = instantiator.newKryo() val classLoader = Thread.currentThread.getContextClassLoader @@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging StorageLevel.MEMORY_ONLY, PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), GotBlock("1", ByteBuffer.allocate(1)), - GetBlock("1") + GetBlock("1"), + 1 to 10, + 1 until 10, + 1L to 10L, + 1L until 10L ) for (obj <- toRegister) kryo.register(obj.getClass) @@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging case _: Exception => println("Failed to register spark.kryo.registrator") } + // Register Chill's classes; we do this after our ranges and the user's own classes to let + // our code override the generic serialziers in Chill for things like Seq + new AllScalaRegistrar().apply(kryo) + kryo.setClassLoader(classLoader) // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dfc62e29/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 0164dda..c016c51 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three"))) } + test("ranges") { + val ser = (new KryoSerializer).newInstance() + def check[T](t: T) { + assert(ser.deserialize[T](ser.serialize(t)) === t) + // Check that very long ranges don't get written one element at a time + assert(ser.serialize(t).limit < 100) + } + check(1 to 1000000) + check(1 to 1000000 by 2) + check(1 until 1000000) + check(1 until 1000000 by 2) + check(1L to 1000000L) + check(1L to 1000000L by 2L) + check(1L until 1000000L) + check(1L until 1000000L by 2L) + check(1.0 to 1000000.0 by 1.0) + check(1.0 to 1000000.0 by 2.0) + check(1.0 until 1000000.0 by 1.0) + check(1.0 until 1000000.0 by 2.0) + } + test("custom registrator") { System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)