flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dongwon Kim <eastcirc...@gmail.com>
Subject Re: Kafka ProducerFencedException after checkpointing
Date Wed, 21 Mar 2018 09:30:01 GMT
Hi Piotr,

Now my streaming pipeline is working without retries.
I decreased Flink's checkpoint interval from 15min to 10min as you
suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external
transaction starts.
The truth is that Producer's transaction timeout starts after the last
external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for
Flink's checkpoint interval, and every checkpoint takes less than 5
minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!

Best,

Dongwon



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <piotr@data-artisans.com>
wrote:

> Hi,
>
> Please increase transaction.timeout.ms to a greater value or decrease
> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
> two values are overlapping. I think that’s even visible on the screenshots.
> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
> the second one started at 14:45:53 and ended at 14:49:16. That gives you
> minimal transaction duration of 15 minutes and 10 seconds, with maximal
> transaction duration of 21 minutes.
>
> In HAPPY SCENARIO (without any failure and restarting), you should assume
> that your timeout interval should cover with some safety margin the period
> between start of a checkpoint and end of the NEXT checkpoint, since this is
> the upper bound how long the transaction might be used. In your case at
> least ~25 minutes.
>
> On top of that, as described in the docs, https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/connectors/
> kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your
> timeout must also be able to cover the additional downtime required for the
> successful job restart. Thus you should increase your timeout accordingly.
>
> Piotrek
>
>
> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirclek@gmail.com> wrote:
>
> Hi Piotr,
>
> We have set producer's [transaction.timeout.ms] to 15 minutes and have
> used the default setting for broker (15 mins).
> As Flink's checkpoint interval is 15 minutes, it is not a situation where
> Kafka's timeout is smaller than Flink's checkpoint interval.
> As our first checkpoint just takes 2 minutes, it seems like transaction is
> not committed properly.
>
> Best,
>
> - Dongwon
>
>
>
>
>
> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka
>> producer configuration (transaction.timeout.ms property) and Kafka
>> broker configuration. The most likely cause of such error message is when
>> Kafka's timeout is smaller then Flink’s checkpoint interval and
>> transactions are not committed quickly enough before timeout occurring.
>>
>> Piotrek
>>
>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirclek@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th,
>> 7th, ... checkpoints:
>>
>> --
>>
>> java.lang.RuntimeException: Error while confirming checkpoint
>> 	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the same transactionalId,
or the producer's transaction has been expired by the broker.
>>
>> --
>>
>>
>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing
>> using Kafka sink.
>> We use FsStateBackend to store snapshot data on HDFS.
>>
>> As shown in configuration.png, my checkpoint configuration is:
>> - Checkpointing Mode : Exactly Once
>> - Interval : 15m 0s
>> - Timeout : 10m 0s
>> - Minimum Pause Between Checkpoints : 5m 0s
>> - Maximum Concurrent Checkpoints : 1
>> - Persist Checkpoints Externally : Disabled
>>
>> After the first checkpoint completed [see history after 1st ckpt.png],
>> the job is restarted due to the ProducerFencedException [see exception
>> after 1st ckpt.png].
>> The first checkpoint takes less than 2 minutes while my checkpoint
>> interval is 15m and minimum pause between checkpoints is 5m.
>> After the job is restarted, the second checkpoint is triggered after a
>> while [see history after 2nd ckpt.png] and this time I've got no exception.
>> The third checkpoint results in the same exception as after the first
>> checkpoint.
>>
>> Can anybody let me know what's going wrong behind the scene?
>>
>> Best,
>>
>> Dongwon
>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history
after
>> 2nd ckpt.png><configuration.png><summary.png><exception after 1st
>> ckpt.png><history after 1st ckpt.png>
>>
>>
>>
>
>

Mime
View raw message