flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paris Carbone <par...@kth.se>
Subject Re: How to ensure exactly-once semantics in output to Kafka?
Date Fri, 05 Feb 2016 12:28:54 GMT
Hi Gabor,

The sinks should aware that the global checkpoint is indeed persisted before emitting so they
will have to wait until they are notified for its completion before pushing to Kafka. The
current view of the local state is not the actual persisted view so checking against is like
relying on dirty state. Imagine the following scenario:

1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job
gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not complete before
and k is not in the local buffer

Chesnay’s approach seems to be valid and best effort for the time being.

Paris

> On 05 Feb 2016, at 13:09, Gábor Gévay <ggab90@gmail.com> wrote:
> 
> Hello,
> 
>> I think that there is actually a fundamental latency issue with
>> "exactly once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well,
>> to a specific point where you are sure no replay will ever be needed.
> 
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
> 
> Best,
> Gábor
> 
> 
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>> Hi Niels!
>> 
>> In general, exactly once output requires transactional cooperation from the
>> target system. Kafka has that on the roadmap, we should be able to integrate
>> that once it is out.
>> That means output is "committed" upon completed checkpoints, which
>> guarantees nothing is written multiple times.
>> 
>> Chesnay is working on an interesting prototype as a generic solution (also
>> for Kafka, while they don't have that feature):
>> It buffers the data in the sink persistently (using the fault tolerance
>> state backends) and pushes the results out on notification of a completed
>> checkpoint.
>> That gives you exactly once semantics, but involves an extra materialization
>> of the data.
>> 
>> 
>> I think that there is actually a fundamental latency issue with "exactly
>> once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well, to a
>> specific point where you are sure no replay will ever be needed.
>> 
>> So the latency in Flink for an exactly-once output would be at least the
>> checkpoint interval.
>> 
>> I'm eager to hear your thoughts on this.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Niels@basjes.nl> wrote:
>>> 
>>> Hi,
>>> 
>>> It is my understanding that the exactly-once semantics regarding the input
>>> from Kafka is based on the checkpointing in the source component retaining
>>> the offset where it was at the checkpoint moment.
>>> 
>>> My question is how does that work for a sink? How can I make sure that (in
>>> light of failures) each message that is read from Kafka (my input) is
>>> written to Kafka (my output) exactly once?
>>> 
>>> 
>>> --
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>> 
>> 

Mime
View raw message