spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Richard Yu (JIRA)" <>
Subject [jira] [Commented] (SPARK-18258) Sinks need access to offset representation
Date Wed, 27 Jun 2018 11:13:00 GMT


Richard Yu commented on SPARK-18258:

Just a question, I noticed that in {{KafkaSink}}'s particular implementation of {{addBatch}},
there is a preexisting schema which needs to be followed. More specifically, a {{ProducerRecord}}
(provided by Kafka) will be sent to the producer with room only for the topic name, a key,
and a value. However, there does not appear to be anyway that exists where we can also export
the data involving the {{start}} and {{end}} {{OffsetSeq}}s to Kafka as well. So I am right
to assume that the new data included is to be used for checkpointing purposes only?

> Sinks need access to offset representation
> ------------------------------------------
>                 Key: SPARK-18258
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>            Priority: Major
> Transactional "exactly-once" semantics for output require storing an offset identifier
in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not the actual
offset representation.
> I want to store the actual offsets, so that they are recoverable as long as the results
are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for the starting
and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation).
 That would be an API change, but if there's another way to map batch ids to offset representations
without changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a job as e.g.
the Kafka dstream gives, because Sinks are the main place that should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for this ticket
might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using committedOffsets
and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back to the correct
source when restarting complicated multi-source jobs, but I think it'd be sufficient.  Passing
the string/json representation of the seq instead of the seq itself would probably be sufficient
as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic
to parse, and the constant defining that is private.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message