beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kobi Salant (JIRA)" <>
Subject [jira] [Commented] (BEAM-2669) Kryo serialization exception when DStreams containing non-Kryo-serializable data are cached
Date Tue, 29 Aug 2017 13:20:00 GMT


Kobi Salant commented on BEAM-2669:

PR contains the latest changes for the kryo exception.

The first step was to remove kryo Serializer hard coded settings but that opened another issue,
all our RDDs/DStreams are of type WindowdValue which is not java serializable. A discussion
o the dev list raised valid concerns of not using the coders and breaking our agreement with
the user.

So, we moved to a different direction when caching/persisting map to bytes with coders so
we will not need to serialize beam and user classes. Due to this change it is impossible now
to check the cached rdd which is now not the PCollection's rdd and we removed the storage
level test.

> Kryo serialization exception when DStreams containing non-Kryo-serializable data are
> -------------------------------------------------------------------------------------------
>                 Key: BEAM-2669
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 0.4.0, 0.5.0, 0.6.0, 2.0.0
>            Reporter: Aviem Zur
>            Assignee: Kobi Salant
> Today, when we detect re-use of a dataset in a pipeline in Spark runner we eagerly cache
it to avoid calculating the same data multiple times.
> ([|])
> When the dataset is bounded, which in Spark is represented by an {{RDD}}, we call {{RDD#persist}}
and use storage level provided by the user via {{SparkPipelineOptions}}. ([|])
> When the dataset is unbounded, which in Spark is represented by a {{DStream}} we call
{{DStream.cache()}} which defaults to persist the {{DStream}} using storage level {{MEMORY_ONLY_SER}}
>  ([DStream.scala|])
> Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using its configured
serializer. Since we configure this to be Kryo in a hard coded fashion, this means the data
will be serialized using Kryo. ([|])
> Due to this, if your {{DStream}} contains non-Kryo-serializable data you will encounter
Kryo serialization exceptions and your task will fail.
> Possible actions we should consider:
> # Remove the hard coded Spark serializer configuration, this should be taken from the
user's configuration of Spark, no real reason for us to interfere with this.
> # Use the user's configured storage level configuration from {{SparkPipelineOptions}}
when caching unbounded datasets ({{DStream}}s), same as we do for bounded datasets.
> # Make caching of re-used datasets configurable in {{SparkPipelineOptions}} (enable/disable).
Although overloading our configuration with more options is always something not to be taken

This message was sent by Atlassian JIRA

View raw message