flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: How to ensure exactly-once semantics in output to Kafka?
Date Fri, 05 Feb 2016 13:14:48 GMT
@Stephan;
Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
So my idea is based on the fact that Kafka has all the messages and that I
can read those messages from Kafka to validate if I should or should not
write them again.

Let me illustrate what I had in mind:
I write messages to Kafka and at the moment of the checkpoint the last
message ID I wrote is 5.
Then I write 6,7,8
FAIL
Recover:
Open a reader starting at message 5
Get message 6 -> Read from Kafka --> Already have this --> Skip
Get message 7 -> Read from Kafka --> Already have this --> Skip
Get message 8 -> Read from Kafka --> Already have this --> Skip
Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
normal operations.

Like I said: This is just the first rough idea I had on a possible
direction how this can be solved without the latency impact of buffering.

Niels Basjes


On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <sewen@apache.org> wrote:

> @Niels: I don't fully understand your approach so far.
>
> If you write a message to Kafka between two checkpoints, where do you
> store the information that this particular message is already written (I
> think this would be the ID in your example).
> Such an information would need to be persisted for every written messages
> (or very small group of messages).
>
> Stephan
>
>
> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <Niels@basjes.nl> wrote:
>
>> Hi,
>>
>> Buffering the data (in all cases) would hurt the latency so much that
>> Flink is effectively reverting to microbatching (where batch size is
>> checkpoint period) with regards of the output.
>>
>> My initial thoughts on how to solve this was as follows:
>> 1) The output persists the ID of the last message it wrote to Kafka in
>> the checkpoint.
>> 2) Upon recovery the sink would
>> 2a) Record the offset Kafka is at at that point in time
>> 2b) For all 'new' messages validate if it must write this message by
>> reading from Kafka (starting at the offset in the checkpoint) and if the
>> message is already present it would skip it.
>> 3) If a message arrives that has not yet written the message is written.
>> Under the assumption that the messages arrive in the same order as before
>> the sink can now simply run as normal.
>>
>> This way the performance is only impacted in the (short) period after the
>> recovery of a disturbance.
>>
>> What do you think?
>>
>> Niels Basjes
>>
>>
>>
>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> In general, exactly once output requires transactional cooperation from
>>> the target system. Kafka has that on the roadmap, we should be able to
>>> integrate that once it is out.
>>> That means output is "committed" upon completed checkpoints, which
>>> guarantees nothing is written multiple times.
>>>
>>> Chesnay is working on an interesting prototype as a generic solution
>>> (also for Kafka, while they don't have that feature):
>>> It buffers the data in the sink persistently (using the fault tolerance
>>> state backends) and pushes the results out on notification of a completed
>>> checkpoint.
>>> That gives you exactly once semantics, but involves an extra
>>> materialization of the data.
>>>
>>>
>>> I think that there is actually a fundamental latency issue with "exactly
>>> once sinks", no matter how you implement them in any systems:
>>> You can only commit once you are sure that everything went well, to a
>>> specific point where you are sure no replay will ever be needed.
>>>
>>> So the latency in Flink for an exactly-once output would be at least the
>>> checkpoint interval.
>>>
>>> I'm eager to hear your thoughts on this.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <Niels@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> It is my understanding that the exactly-once semantics regarding the
>>>> input from Kafka is based on the checkpointing in the source component
>>>> retaining the offset where it was at the checkpoint moment.
>>>>
>>>> My question is how does that work for a sink? How can I make sure that
>>>> (in light of failures) each message that is read from Kafka (my input) is
>>>> written to Kafka (my output) exactly once?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message