spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jong Wook Kim <jongw...@nyu.edu>
Subject Re: Streaming checkpoints and logic change
Date Wed, 08 Jul 2015 17:38:39 GMT
Hi TD, you answered a wrong question. If you read the subject, mine was
specifically about checkpointing. I'll elaborate

The checkpoint, which is a serialized DStream DAG, contains all the
metadata and *logic*, like the function passed to e.g. DStream.transform()

This is serialized as a anonymous inner class at the JVM level, and will
not tolerate the slightest logic change, because the class signature will
change and cannot deserialize from the checkpoint which contains the
serialized from the previous version.

Logic changes are extremely common in stream processing. Say I have a log
transformer which extracts certain fields of logs from a Kafka stream and I
want to add another field to extract. This involves dstream logic changes,
thus cannot be done using checkpoint, I can't even achieve at-least-once
guarantee.

My current workaround is to read current offsets by casting to
HasOffsetRanges
<https://github.com/apache/spark/blob/v1.4.1-rc3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala#L36-L39>
and
saving them to ZooKeeper, and give fromOffsets parameter read from
ZooKeeper when creating a directStream. I've settled down to this approach
for now, but I want to know how makers of Spark Streaming think about this
drawback of checkpointing.

If anyone had similar experience, suggestions will be appreciated.

Jong Wook



On 9 July 2015 at 02:13, Tathagata Das <tdas@databricks.com> wrote:

> You can use DStream.transform for some stuff. Transform takes a RDD => RDD
> function that allow arbitrary RDD operations to be done on RDDs of a
> DStream. This function gets evaluated on the driver on every batch
> interval. If you are smart about writing the function, it can do different
> stuff at different intervals. For example, you can always use a
> continuously updated set of filters
>
> dstream.transform { rdd =>
>    val broadcastedFilters = Filters.getLatest()
>    val newRDD  = rdd.filter { x => broadcastedFilters.get.filter(x) }
>    newRDD
> }
>
>
> The function Filters.getLatest() will return the latest set of filters
> that is broadcasted out, and as the transform function is processed in
> every batch interval, it will always use the latest filters.
>
> HTH.
>
> TD
>
> On Wed, Jul 8, 2015 at 10:02 AM, Jong Wook Kim <jongwook@nyu.edu> wrote:
>
>> I just asked this question at the streaming webinar that just ended, but
>> the speakers didn't answered so throwing here:
>>
>> AFAIK checkpoints are the only recommended method for running Spark
>> streaming without data loss. But it involves serializing the entire dstream
>> graph, which prohibits any logic changes. How should I update / fix logic
>> of a running streaming app without any data loss?
>>
>> Jong Wook
>>
>
>

Mime
View raw message