Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, since it’s the Kafka that timeouts those transactions and I don’t see in the documentation anything that could override this [1]. You might try disabling the mechanism via setting `transaction.abort.timed.out.transaction.cleanup.interval.ms` or `transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when aborting the transaction and MAYBE you can still access them via “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to at least once mode?


On 12 Aug 2019, at 06:29, Tony Wei <tony19920430@gmail.com> wrote:


I had the same exception recently. I want to confirm that if it is due to transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at least once semantic in
this situation?

Tony Wei

Piotr Nowojski <piotr@data-artisans.com> 於 2018年3月21日 週三 下午10:28寫道:

But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts).


On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirclek@gmail.com> wrote:

Hi Piotr,

Now my streaming pipeline is working without retries. 
I decreased Flink's checkpoint interval from 15min to 10min as you suggested [see screenshot_10min_ckpt.png].

I though that producer's transaction timeout starts when the external transaction starts.
The truth is that Producer's transaction timeout starts after the last external checkpoint is committed.
Now that I have 15min for Producer's transaction timeout and 10min for Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very much for the detailed explanation!



On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <piotr@data-artisans.com> wrote:

Please increase transaction.timeout.ms to a greater value or decrease Flink’s checkpoint interval, I’m pretty sure the issue here is that those two values are overlapping. I think that’s even visible on the screenshots. First checkpoint completed started at 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 and ended at 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 seconds, with maximal transaction duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that your timeout interval should cover with some safety margin the period between start of a checkpoint and end of the NEXT checkpoint, since this is the upper bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance , in case of failure, your timeout must also be able to cover the additional downtime required for the successful job restart. Thus you should increase your timeout accordingly. 


On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirclek@gmail.com> wrote:

Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.


- Dongwon

On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <piotr@data-artisans.com> wrote:

What’s your Kafka’s transaction timeout setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker configuration. The most likely cause of such error message is when Kafka's timeout is smaller then Flink’s checkpoint interval and transactions are not committed quickly enough before timeout occurring.


On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirclek@gmail.com> wrote:


I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 7th, ... checkpoints:
java.lang.RuntimeException: Error while confirming checkpoint
	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing using Kafka sink.
We use FsStateBackend to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint configuration is:
- Checkpointing Mode : Exactly Once
- Interval : 15m 0s
- Timeout : 10m 0s
- Minimum Pause Between Checkpoints : 5m 0s
- Maximum Concurrent Checkpoints : 1
- Persist Checkpoints Externally : Disabled

After the first checkpoint completed [see history after 1st ckpt.png], the job is restarted due to the ProducerFencedException [see exception after 1st ckpt.png].
The first checkpoint takes less than 2 minutes while my checkpoint interval is 15m and minimum pause between checkpoints is 5m.
After the job is restarted, the second checkpoint is triggered after a while [see history after 2nd ckpt.png] and this time I've got no exception.
The third checkpoint results in the same exception as after the first checkpoint.

Can anybody let me know what's going wrong behind the scene?


<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st ckpt.png>