spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lev <kat...@gmail.com>
Subject Prevent spark from serializing some objects
Date Tue, 15 Sep 2015 12:30:30 GMT
Hello,

As I understand it, using the method /bar/ will result in serializing the
/Foo/ instance to the cluster:

/class Foo() {
    val x = 5

    def bar(rdd: RDD[Int]): RDD[Int] = {
        rdd.map(_*x)
    }
}/

and since the /Foo/ instance might be very big, it might cause performance
hit.

I know how to solve this case (create a local copy of /x/ inside /bar/, and
use it).

I would like to know,
is there a way to prevent /Foo/ from ever being serialized and sent to the
cluster?

I can't force /Foo/ to be not serializable, since it need to be serialized
at some other stage (not sent to spark, just saved to disk)

One idea that I tried was to create a trait like:

/trait SparkNonSrializable

class Foo extends SparkNonSrializable {...}/

and use a custom serializer in spark (by setting the "spark.serializer"
conf),
that will fail for objects that extends /SparkNonSrializable/
but I wasn't able to make it work
(the serializer is getting used, but the condition
/t.isInstanceOf[SparkNonSerializable]/ is never true)

here is my code:
----
trait SparkNonSerializable

class MySerializer(conf: SparkConf) extends JavaSerializer(conf) {
  override def newInstance(): SerializerInstance = {
    val inst = super.newInstance()

    new SerializerInstance {
      override def serializeStream(s: OutputStream): SerializationStream =
inst.serializeStream(s)
      override def serialize[T](t: T)(implicit evidence$1: ClassTag[T]):
ByteBuffer =
      {
        if (t.isInstanceOf[SparkNonSerializable])
          ???
        inst.serialize(t)
      }

      override def deserializeStream(s: InputStream): DeserializationStream
= inst.deserializeStream(s)

      override def deserialize[T](bytes: ByteBuffer)(implicit evidence$2:
ClassTag[T]): T = {
        val t = inst.deserialize(bytes)
        if (t.isInstanceOf[SparkNonSerializable])
          ???
        t
      }

      override def deserialize[T](bytes: ByteBuffer, loader:
ClassLoader)(implicit evidence$3: ClassTag[T]): T = {
        val t = inst.deserialize(bytes, loader)
        if (t.isInstanceOf[SparkNonSerializable])
          ???
        t
      }
    }
  }
}
----


My questions are:
1. Do you see why my custom serializer can't catch objects with that trait

2. Any other ideas of how to prevent /Foo/ from being serialized?
My solution might be OK for tests,
but I'm a bit reluctant to use my own serializer on production code

Thanks,
Lev.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Prevent-spark-from-serializing-some-objects-tp24700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message