spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Reynold Xin (JIRA)" <>
Subject [jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
Date Fri, 13 Oct 2017 01:41:01 GMT


Reynold Xin commented on SPARK-20928:

OK got it - you are basically saying if we can send the offset associated with each record
to the sink, then the sink can potentially implement some sort of dedup to guarantee idempotency.
For most sinks this probably won't work, but if a particular sink offers a way to do it, then
end-to-end exactly once can be accomplished.

> Continuous Processing Mode for Structured Streaming
> ---------------------------------------------------
>                 Key: SPARK-20928
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Michael Armbrust
>              Labels: SPIP
> Given the current Source API, the minimum possible latency for any record is bounded
by the amount of time that it takes to launch a task.  This limitation is a result of the
fact that {{getBatch}} requires us to know both the starting and the ending offset, before
any tasks are launched.  In the worst case, the end-to-end latency is actually closer to the
average batch time + task launching time.
> For applications where latency is more important than exactly-once output however, it
would be useful if processing could happen continuously.  This would allow us to achieve fully
pipelined reading and writing from sources such as Kafka.  This kind of architecture would
make it possible to process records with end-to-end latencies on the order of 1 ms, rather
than the 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like the following
rough sketch:
> {code}
>   trait Epoch {
>     def data: DataFrame
>     /** The exclusive starting position for `data`. */
>     def startOffset: Offset
>     /** The inclusive ending position for `data`.  Incrementally updated during processing,
but not complete until execution of the query plan in `data` is finished. */
>     def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], limits: Limits):
> {code}
> The above would allow us to build an alternative implementation of {{StreamExecution}}
that processes continuously with much lower latency and only stops processing when needing
to reconfigure the stream (either due to a failure or a user requested change in parallelism.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message