Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DF2B17219 for ; Wed, 6 May 2015 17:53:25 +0000 (UTC) Received: (qmail 9906 invoked by uid 500); 6 May 2015 17:53:25 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 9870 invoked by uid 500); 6 May 2015 17:53:25 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 9856 invoked by uid 99); 6 May 2015 17:53:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2015 17:53:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94900E042A; Wed, 6 May 2015 17:53:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joshrosen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-7311] Introduce internal Serializer API for determining if serializers support object relocation Date: Wed, 6 May 2015 17:53:24 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.4 b521a3b03 -> d651e2838 [SPARK-7311] Introduce internal Serializer API for determining if serializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in #4450 and #5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in #4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen Closes #5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. (cherry picked from commit 002c12384d6ecebbb3e7fc853dbdfbc5aaa3d6a6) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d651e283 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d651e283 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d651e283 Branch: refs/heads/branch-1.4 Commit: d651e28383a590f260be62be91f1e0770a21267a Parents: b521a3b Author: Josh Rosen Authored: Wed May 6 10:52:55 2015 -0700 Committer: Josh Rosen Committed: Wed May 6 10:53:19 2015 -0700 ---------------------------------------------------------------------- .../spark/serializer/KryoSerializer.scala | 7 ++ .../apache/spark/serializer/Serializer.scala | 35 +++++- .../spark/util/collection/ExternalSorter.scala | 3 +- .../serializer/SerializerPropertiesSuite.scala | 119 +++++++++++++++++++ .../sql/execution/SparkSqlSerializer2.scala | 5 + 5 files changed, 166 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d651e283/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index b7bc087..f9f7885 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf) override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } + + private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { + // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects + // in the stream rather than writing those objects' serialized bytes, breaking relocation. See + // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. + newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() + } } private[spark] http://git-wip-us.apache.org/repos/asf/spark/blob/d651e283/core/src/main/scala/org/apache/spark/serializer/Serializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index c381672..6078c9d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{DeveloperApi, Private} import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator} /** @@ -63,6 +63,39 @@ abstract class Serializer { /** Creates a new [[SerializerInstance]]. */ def newInstance(): SerializerInstance + + /** + * :: Private :: + * Returns true if this serializer supports relocation of its serialized objects and false + * otherwise. This should return true if and only if reordering the bytes of serialized objects + * in serialization stream output is equivalent to having re-ordered those elements prior to + * serializing them. More specifically, the following should hold if a serializer supports + * relocation: + * + * {{{ + * serOut.open() + * position = 0 + * serOut.write(obj1) + * serOut.flush() + * position = # of bytes writen to stream so far + * obj1Bytes = output[0:position-1] + * serOut.write(obj2) + * serOut.flush() + * position2 = # of bytes written to stream so far + * obj2Bytes = output[position:position2-1] + * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) + * }}} + * + * In general, this property should hold for serializers that are stateless and that do not + * write special metadata at the beginning or end of the serialization stream. + * + * This API is private to Spark; this method should not be overridden in third-party subclasses + * or called in user code and is subject to removal in future Spark releases. + * + * See SPARK-7311 for more details. + */ + @Private + private[spark] def supportsRelocationOfSerializedObjects: Boolean = false } http://git-wip-us.apache.org/repos/asf/spark/blob/d651e283/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b7306cd..7d5cf7b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C]( private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB private val useSerializedPairBuffer = !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.isInstanceOf[KryoSerializer] && - serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset + ser.supportsRelocationOfSerializedObjects // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we http://git-wip-us.apache.org/repos/asf/spark/blob/d651e283/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala new file mode 100644 index 0000000..bb34033 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import scala.util.Random + +import org.scalatest.{Assertions, FunSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset + +/** + * Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that + * describe properties of the serialized stream, such as + * [[Serializer.supportsRelocationOfSerializedObjects]]. + */ +class SerializerPropertiesSuite extends FunSuite { + + import SerializerPropertiesSuite._ + + test("JavaSerializer does not support relocation") { + // Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the + // full class name the first time an object is written to an output stream, but subsequent + // references to the class write a more compact identifier; this prevents relocation. + val ser = new JavaSerializer(new SparkConf()) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) + } + + test("KryoSerializer supports relocation when auto-reset is enabled") { + val ser = new KryoSerializer(new SparkConf) + assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) + } + + test("KryoSerializer does not support relocation when auto-reset is disabled") { + val conf = new SparkConf().set("spark.kryo.registrator", + classOf[RegistratorWithoutAutoReset].getName) + val ser = new KryoSerializer(conf) + assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) + testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) + } + +} + +object SerializerPropertiesSuite extends Assertions { + + def generateRandomItem(rand: Random): Any = { + val randomFunctions: Seq[() => Any] = Seq( + () => rand.nextInt(), + () => rand.nextString(rand.nextInt(10)), + () => rand.nextDouble(), + () => rand.nextBoolean(), + () => (rand.nextInt(), rand.nextString(rand.nextInt(10))), + () => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))), + () => { + val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))) + (x, x) + } + ) + randomFunctions(rand.nextInt(randomFunctions.size)).apply() + } + + def testSupportsRelocationOfSerializedObjects( + serializer: Serializer, + generateRandomItem: Random => Any): Unit = { + if (!serializer.supportsRelocationOfSerializedObjects) { + return + } + val NUM_TRIALS = 5 + val rand = new Random(42) + for (_ <- 1 to NUM_TRIALS) { + val items = { + // Make sure that we have duplicate occurrences of the same object in the stream: + val randomItems = Seq.fill(10)(generateRandomItem(rand)) + randomItems ++ randomItems.take(5) + } + val baos = new ByteArrayOutputStream() + val serStream = serializer.newInstance().serializeStream(baos) + def serializeItem(item: Any): Array[Byte] = { + val itemStartOffset = baos.toByteArray.length + serStream.writeObject(item) + serStream.flush() + val itemEndOffset = baos.toByteArray.length + baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone() + } + val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = { + val serItems = items.map { + item => (item, serializeItem(item)) + } + serStream.close() + rand.shuffle(serItems) + } + val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray + val deserializedItemsStream = serializer.newInstance().deserializeStream( + new ByteArrayInputStream(reorderedSerializedData)) + assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1)) + deserializedItemsStream.close() + } + } +} + +private case class MyCaseClass(foo: Int, bar: String) http://git-wip-us.apache.org/repos/asf/spark/blob/d651e283/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 9552f41..35ad987 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -154,6 +154,11 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema: with Serializable{ def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema) + + override def supportsRelocationOfSerializedObjects: Boolean = { + // SparkSqlSerializer2 is stateless and writes no stream headers + true + } } private[sql] object SparkSqlSerializer2 { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org