flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: How to ensure exactly-once semantics in output to Kafka?
Date Fri, 05 Feb 2016 13:23:25 GMT
Hi Niels!

That could actually work, given a way to identify messages with a unique ID.

Would be quite an exercise to implement...

Stephan


On Fri, Feb 5, 2016 at 2:14 PM, Niels Basjes <Niels@basjes.nl> wrote:

> @Stephan;
> Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
> So my idea is based on the fact that Kafka has all the messages and that I
> can read those messages from Kafka to validate if I should or should not
> write them again.
>
> Let me illustrate what I had in mind:
> I write messages to Kafka and at the moment of the checkpoint the last
> message ID I wrote is 5.
> Then I write 6,7,8
> FAIL
> Recover:
> Open a reader starting at message 5
> Get message 6 -> Read from Kafka --> Already have this --> Skip
> Get message 7 -> Read from Kafka --> Already have this --> Skip
> Get message 8 -> Read from Kafka --> Already have this --> Skip
> Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
> normal operations.
>
> Like I said: This is just the first rough idea I had on a possible
> direction how this can be solved without the latency impact of buffering.
>
> Niels Basjes
>
>
> On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> @Niels: I don't fully understand your approach so far.
>>
>> If you write a message to Kafka between two checkpoints, where do you
>> store the information that this particular message is already written (I
>> think this would be the ID in your example).
>> Such an information would need to be persisted for every written messages
>> (or very small group of messages).
>>
>> Stephan
>>
>>
>> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <Niels@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> Buffering the data (in all cases) would hurt the latency so much that
>>> Flink is effectively reverting to microbatching (where batch size is
>>> checkpoint period) with regards of the output.
>>>
>>> My initial thoughts on how to solve this was as follows:
>>> 1) The output persists the ID of the last message it wrote to Kafka in
>>> the checkpoint.
>>> 2) Upon recovery the sink would
>>> 2a) Record the offset Kafka is at at that point in time
>>> 2b) For all 'new' messages validate if it must write this message by
>>> reading from Kafka (starting at the offset in the checkpoint) and if the
>>> message is already present it would skip it.
>>> 3) If a message arrives that has not yet written the message is written.
>>> Under the assumption that the messages arrive in the same order as before
>>> the sink can now simply run as normal.
>>>
>>> This way the performance is only impacted in the (short) period after
>>> the recovery of a disturbance.
>>>
>>> What do you think?
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi Niels!
>>>>
>>>> In general, exactly once output requires transactional cooperation from
>>>> the target system. Kafka has that on the roadmap, we should be able to
>>>> integrate that once it is out.
>>>> That means output is "committed" upon completed checkpoints, which
>>>> guarantees nothing is written multiple times.
>>>>
>>>> Chesnay is working on an interesting prototype as a generic solution
>>>> (also for Kafka, while they don't have that feature):
>>>> It buffers the data in the sink persistently (using the fault tolerance
>>>> state backends) and pushes the results out on notification of a completed
>>>> checkpoint.
>>>> That gives you exactly once semantics, but involves an extra
>>>> materialization of the data.
>>>>
>>>>
>>>> I think that there is actually a fundamental latency issue with
>>>> "exactly once sinks", no matter how you implement them in any systems:
>>>> You can only commit once you are sure that everything went well, to a
>>>> specific point where you are sure no replay will ever be needed.
>>>>
>>>> So the latency in Flink for an exactly-once output would be at least
>>>> the checkpoint interval.
>>>>
>>>> I'm eager to hear your thoughts on this.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Niels@basjes.nl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It is my understanding that the exactly-once semantics regarding the
>>>>> input from Kafka is based on the checkpointing in the source component
>>>>> retaining the offset where it was at the checkpoint moment.
>>>>>
>>>>> My question is how does that work for a sink? How can I make sure that
>>>>> (in light of failures) each message that is read from Kafka (my input)
is
>>>>> written to Kafka (my output) exactly once?
>>>>>
>>>>>
>>>>> --
>>>>> Best regards / Met vriendelijke groeten,
>>>>>
>>>>> Niels Basjes
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Mime
View raw message