flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: Kafka ProducerFencedException after checkpointing
Date Mon, 12 Aug 2019 07:27:03 GMT
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, since it’s
the Kafka that timeouts those transactions and I don’t see in the documentation anything
that could override this [1]. You might try disabling the mechanism via setting `transaction.abort.timed.out.transaction.cleanup.interval.ms`
or `transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s question more
to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when aborting the transaction
and MAYBE you can still access them via “READ_UNCOMMITTED” mode. But that’s again, question
to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to at least
once mode?

Piotrek

> On 12 Aug 2019, at 06:29, Tony Wei <tony19920430@gmail.com> wrote:
> 
> Hi,
> 
> I had the same exception recently. I want to confirm that if it is due to transaction
timeout,
> then I will lose those data. Am I right? Can I make it fall back to at least once semantic
in
> this situation?
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
於 2018年3月21日 週三 下午10:28寫道:
> Hi,
> 
> But that’s exactly the case: producer’s transaction timeout starts when the external
transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole
period between checkpoints.
> 
> As I wrote in the previous message:
> 
> > in case of failure, your timeout must also be able to cover the additional downtime
required for the successful job restart. Thus you should increase your timeout accordingly.
> 
> I think that 15 minutes timeout is a way too small value. If your job fails because of
some intermittent failure (for example worker crash/restart), you will only have a couple
of minutes for a successful Flink job restart. Otherwise you will lose some data (because
of the transaction timeouts).
> 
> Piotrek
> 
>> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirclek@gmail.com <mailto:eastcirclek@gmail.com>>
wrote:
>> 
>> Hi Piotr,
>> 
>> Now my streaming pipeline is working without retries. 
>> I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see
screenshot_10min_ckpt.png].
>> 
>> I though that producer's transaction timeout starts when the external transaction
starts.
>> The truth is that Producer's transaction timeout starts after the last external checkpoint
is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint
interval, and every checkpoint takes less than 5 minutes, everything is working fine.
>> Am I right?
>> 
>> Anyway thank you very much for the detailed explanation!
>> 
>> Best,
>> 
>> Dongwon
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
>> Hi,
>> 
>> Please increase transaction.timeout.ms <http://transaction.timeout.ms/> to
a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here
is that those two values are overlapping. I think that’s even visible on the screenshots.
First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one
started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of
15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.
>> 
>> In HAPPY SCENARIO (without any failure and restarting), you should assume that your
timeout interval should cover with some safety margin the period between start of a checkpoint
and end of the NEXT checkpoint, since this is the upper bound how long the transaction might
be used. In your case at least ~25 minutes.
>> 
>> On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
, in case of failure, your timeout must also be able to cover the additional downtime required
for the successful job restart. Thus you should increase your timeout accordingly. 
>> 
>> Piotrek
>> 
>> 
>>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirclek@gmail.com <mailto:eastcirclek@gmail.com>>
wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> We have set producer's [transaction.timeout.ms <http://transaction.timeout.ms/>]
to 15 minutes and have used the default setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's
timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction is not
committed properly.
>>> 
>>> Best,
>>> 
>>> - Dongwon
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>
wrote:
>>> Hi,
>>> 
>>> What’s your Kafka’s transaction timeout setting? Please both check Kafka
producer configuration (transaction.timeout.ms <http://transaction.timeout.ms/> property)
and Kafka broker configuration. The most likely cause of such error message is when Kafka's
timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly
enough before timeout occurring.
>>> 
>>> Piotrek
>>> 
>>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirclek@gmail.com <mailto:eastcirclek@gmail.com>>
wrote:
>>>> 
>>>> 
>>>> Hi,
>>>> 
>>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th,
7th, ... checkpoints:
>>>> --
>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>> 	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId,
or the producer's transaction has been expired by the broker.
>>>> --
>>>> 
>>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing
using Kafka sink.
>>>> We use FsStateBackend to store snapshot data on HDFS.
>>>> 
>>>> As shown in configuration.png, my checkpoint configuration is:
>>>> - Checkpointing Mode : Exactly Once
>>>> - Interval : 15m 0s
>>>> - Timeout : 10m 0s
>>>> - Minimum Pause Between Checkpoints : 5m 0s
>>>> - Maximum Concurrent Checkpoints : 1
>>>> - Persist Checkpoints Externally : Disabled
>>>> 
>>>> After the first checkpoint completed [see history after 1st ckpt.png], the
job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
>>>> The first checkpoint takes less than 2 minutes while my checkpoint interval
is 15m and minimum pause between checkpoints is 5m.
>>>> After the job is restarted, the second checkpoint is triggered after a while
[see history after 2nd ckpt.png] and this time I've got no exception.
>>>> The third checkpoint results in the same exception as after the first checkpoint.
>>>> 
>>>> Can anybody let me know what's going wrong behind the scene?
>>>> 
>>>> Best,
>>>> 
>>>> Dongwon
>>>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history
after 2nd ckpt.png><configuration.png><summary.png><exception after 1st
ckpt.png><history after 1st ckpt.png>
>>> 
>>> 
>> 
>> 
>> <screenshot_10min_ckpt.png>
> 


Mime
View raw message