flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: Avoid duplicate messages while restarting a job for an application upgrade
Date Mon, 02 Oct 2017 17:09:50 GMT
We are planning to work on this clean shut down after releasing Flink 1.4. Implementing this
properly would require some work, for example:
- adding some checkpoint options to add information about “closing”/“shutting down”
event
- add clean shutdown to source functions API
- implement handling of this clean shutdown in desired sources

Those are not super complicated changes but also not trivial.

One thing that you could do, is to implement some super hacky filter function just after source
operator, that you would manually trigger. Normally it would pass all of the messages. Once
triggered, it would wait for next checkpoint to happen. It would assume that it is a save
point, and would start filtering out all of the subsequent messages. When this checkpoint
completes, you could manually shutdown your Flink application. This could guarantee that there
are no duplicated writes after a restart. This might work for clean shutdown, but it would
be a very hacky solution. 

Btw, keep in mind that even with clean shutdown you can end up with duplicated messages after
a crash and there is no way around this with Kafka 0.9.

Piotrek

> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philippot@teads.tv> wrote:
> 
> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and until a while).
> 
> We can not afford tens of thousands of duplicated messages for each application upgrade,
can I help by working on this feature ?
> Do you have any hint or details on this part of that "todo list" ? 
>  
> 
> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
a écrit :
> Hi,
> 
> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated messages.
Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be possible to achieve exactly-once
end to end semantic when writing to Kafka. However this still a work in progress:
> 
> https://issues.apache.org/jira/browse/FLINK-6988 <https://issues.apache.org/jira/browse/FLINK-6988>
> 
> However this is a superset of functionality that you are asking for. Exactly-once just
for clean shutdowns is also on our “TODO” list (it would/could support Kafka 0.9), but
it is not currently being actively developed.
> 
> Piotr Nowojski
> 
>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philippot@teads.tv <mailto:antoine.philippot@teads.tv>>
wrote:
>> 
>> Hi,
>> 
>> I'm working on a flink streaming app with a kafka09 to kafka09 use case which handles
around 100k messages per seconds.
>> 
>> To upgrade our application we used to run a flink cancel with savepoint command followed
by a flink run with the previous saved savepoint and the new application fat jar as parameter.
We notice that we can have more than 50k of duplicated messages in the kafka sink wich is
not idempotent.
>> 
>> This behaviour is actually problematic for this project and I try to find a solution
/ workaround to avoid these duplicated messages.
>> 
>> The JobManager indicates clearly that the cancel call is triggered once the savepoint
is finished, but during the savepoint execution, kafka source continue to poll new messages
which will not be part of the savepoint and will be replayed on the next application start.
>> 
>> I try to find a solution with the stop command line argument but the kafka source
doesn't implement StoppableFunction (https://issues.apache.org/jira/browse/FLINK-3404 <https://issues.apache.org/jira/browse/FLINK-3404>)
and the savepoint generation is not available with stop in contrary to cancel.
>> 
>> Is there an other solution to not process duplicated messages for each application
upgrade or rescaling ?
>> 
>> If no, has someone planned to implement it? Otherwise, I can propose a pull request
after some architecture advices.
>> 
>> The final goal is to stop polling source and trigger a savepoint once polling stopped.
>> 
>> Thanks
> 


Mime
View raw message