flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tony Wei <tony19920...@gmail.com>
Subject Re: Kafka ProducerFencedException after checkpointing
Date Mon, 12 Aug 2019 04:29:21 GMT
Hi,

I had the same exception recently. I want to confirm that if it is due to
transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at
least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski <piotr@data-artisans.com> 於 2018年3月21日 週三 下午10:28寫道:

> Hi,
>
> But that’s exactly the case: producer’s transaction timeout starts when
> the external transaction starts - but FlinkKafkaProducer011 keeps an active
> Kafka transaction for the whole period between checkpoints.
>
> As I wrote in the previous message:
>
> > in case of failure, your timeout must also be able to cover the
> additional downtime required for the successful job restart. Thus you
> should increase your timeout accordingly.
>
> I think that 15 minutes timeout is a way too small value. If your job
> fails because of some intermittent failure (for example worker
> crash/restart), you will only have a couple of minutes for a successful
> Flink job restart. Otherwise you will lose some data (because of the
> transaction timeouts).
>
> Piotrek
>
> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirclek@gmail.com> wrote:
>
> Hi Piotr,
>
> Now my streaming pipeline is working without retries.
> I decreased Flink's checkpoint interval from 15min to 10min as you
> suggested [see screenshot_10min_ckpt.png].
>
> I though that producer's transaction timeout starts when the external
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for
> Flink's checkpoint interval, and every checkpoint takes less than 5
> minutes, everything is working fine.
> Am I right?
>
> Anyway thank you very much for the detailed explanation!
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Please increase transaction.timeout.ms to a greater value or decrease
>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>> two values are overlapping. I think that’s even visible on the screenshots.
>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>> transaction duration of 21 minutes.
>>
>> In HAPPY SCENARIO (without any failure and restarting), you should assume
>> that your timeout interval should cover with some safety margin the period
>> between start of a checkpoint and end of the NEXT checkpoint, since this is
>> the upper bound how long the transaction might be used. In your case at
>> least ~25 minutes.
>>
>> On top of that, as described in the docs,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
,
>> in case of failure, your timeout must also be able to cover the additional
>> downtime required for the successful job restart. Thus you should increase
>> your timeout accordingly.
>>
>> Piotrek
>>
>>
>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirclek@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>> used the default setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction
>> is not committed properly.
>>
>> Best,
>>
>> - Dongwon
>>
>>
>>
>>
>>
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <piotr@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> What’s your Kafka’s transaction timeout setting? Please both check Kafka
>>> producer configuration (transaction.timeout.ms property) and Kafka
>>> broker configuration. The most likely cause of such error message is when
>>> Kafka's timeout is smaller then Flink’s checkpoint interval and
>>> transactions are not committed quickly enough before timeout occurring.
>>>
>>> Piotrek
>>>
>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirclek@gmail.com> wrote:
>>>
>>>
>>> Hi,
>>>
>>> I'm faced with the following ProducerFencedException after 1st, 3rd,
>>> 5th, 7th, ... checkpoints:
>>>
>>> --
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> 	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the same transactionalId,
or the producer's transaction has been expired by the broker.
>>>
>>> --
>>>
>>>
>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once
>>> processing using Kafka sink.
>>> We use FsStateBackend to store snapshot data on HDFS.
>>>
>>> As shown in configuration.png, my checkpoint configuration is:
>>> - Checkpointing Mode : Exactly Once
>>> - Interval : 15m 0s
>>> - Timeout : 10m 0s
>>> - Minimum Pause Between Checkpoints : 5m 0s
>>> - Maximum Concurrent Checkpoints : 1
>>> - Persist Checkpoints Externally : Disabled
>>>
>>> After the first checkpoint completed [see history after 1st ckpt.png],
>>> the job is restarted due to the ProducerFencedException [see exception
>>> after 1st ckpt.png].
>>> The first checkpoint takes less than 2 minutes while my checkpoint
>>> interval is 15m and minimum pause between checkpoints is 5m.
>>> After the job is restarted, the second checkpoint is triggered after a
>>> while [see history after 2nd ckpt.png] and this time I've got no exception.
>>> The third checkpoint results in the same exception as after the first
>>> checkpoint.
>>>
>>> Can anybody let me know what's going wrong behind the scene?
>>>
>>> Best,
>>>
>>> Dongwon
>>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history
>>> after 2nd ckpt.png><configuration.png><summary.png><exception
after 1st
>>> ckpt.png><history after 1st ckpt.png>
>>>
>>>
>>>
>>
>>
> <screenshot_10min_ckpt.png>
>
>
>

Mime
View raw message