spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Rosen <rosenvi...@gmail.com>
Subject Re: KryoSerializer for closureSerializer in DAGScheduler
Date Mon, 31 Aug 2015 13:51:40 GMT
There are currently a few known issues with using KryoSerializer as the
closure serializer, so it's going to require some changes to Spark if we
want to properly support this. See https://github.com/apache/spark/pull/6361
and https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of
the difficulties here.

On Mon, Aug 31, 2015 at 3:44 AM, yash datta <saucam@gmail.com> wrote:

> Hi devs,
>
> Curently the only supported serializer for serializing tasks in
> DAGScheduler.scala is JavaSerializer.
>
>
> val taskBinaryBytes: Array[Byte] = stage match {
>   case stage: ShuffleMapStage =>
>     closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
>   case stage: ResultStage =>
>     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
>   }
>
> taskBinary = sc.broadcast(taskBinaryBytes)
>
>
> Could somebody give me pointers as to what all is involved if we want to
> change it to KryoSerializer ?
>
>
>
> One suggestion here
>
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
>
>  was to use chill-scala ' s KryoSerializer
> for closureSerializer :
>
> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>
>
>
> But on digging the code it looks like KryoSerializer being used is from
> twitter chill library only.
>
> in KryoSerializer.scala :
>
> val instantiator = new EmptyScalaKryoInstantiator
> val kryo = instantiator.newKryo()
>
> ----------------------------------------
>
> package com.twitter.chill
> class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
> }
>
>
>
> I am working on a low latency job and much of the time is spent in
> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
> Thoughts ? Is this reasonable ? Wanted to check if shifting to
> kryoserializer helps here.
>
> I am serializing a UnionRDD which is created by code like this :
>
>
> rdds here is a list of schemaRDDs
>
>
> val condition = 'column === indexValue
>
> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>
> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))
>
>
>
> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))
>
>
> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>
>
> My partitioner above selects one partition (from 100 partitions) per RDD
> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
> 127 partitions
>
> Calling unioned.collect leads to serialization of UnionRDD.
>
> I am using spark 1.2.1
>
>
> Any help regarding this will be highly appreciated.
>
>
> Best
> Yash Datta
>
>
> --
> When events unfold with calm and ease
> When the winds that blow are merely breeze
> Learn from nature, from birds and bees
> Live your life in love, and let joy not cease.
>

Mime
View raw message