From user-return-29150-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Aug 12 07:27:11 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 22AFE180637 for ; Mon, 12 Aug 2019 09:27:11 +0200 (CEST) Received: (qmail 12577 invoked by uid 500); 12 Aug 2019 07:27:09 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 12567 invoked by uid 99); 12 Aug 2019 07:27:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2019 07:27:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2CC551A34A3 for ; Mon, 12 Aug 2019 07:27:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.001 X-Spam-Level: ** X-Spam-Status: No, score=2.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id GLTIkBYtzWr6 for ; Mon, 12 Aug 2019 07:27:05 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::443; helo=mail-wr1-x443.google.com; envelope-from=piotr@data-artisans.com; receiver= Received: from mail-wr1-x443.google.com (mail-wr1-x443.google.com [IPv6:2a00:1450:4864:20::443]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 6632C7D3FB for ; Mon, 12 Aug 2019 07:27:05 +0000 (UTC) Received: by mail-wr1-x443.google.com with SMTP id q12so13396813wrj.12 for ; Mon, 12 Aug 2019 00:27:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=o3nxEcZpbU9S8qLXz6kAFgg74GX0Fh852zaJwm0XJTI=; b=AbrhxfTiqBMjldWeHw9wdbbN9OaJ6NLKr6Rg670KRDsrMB8DNEa8GxQtlDc6o4DZN7 MoLg51IW2Vq/qLtIQIz0OjR3bkyKZCBHt0tBJ52CZu07d38rLVQSFjKXg7O/FgCX3hbJ pvDb8n0ULEROT8M83doTLDLXss134n/5eHq76PhdbnSM92swanYPXeta8AS9leTCUSMH 16ily2AGk2Mb8DJZyJaw7gfotR8CnFBZGCPXQBuv60TrrhnKqow6Og8xhJ+uB+JkleIS ow2b5F39fwBP5kYYGTv62AqkdrO3wsFCEHfnxMy3kkEOWh9AWvqHtH/MVSNgicVJbnZv 1zgA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=o3nxEcZpbU9S8qLXz6kAFgg74GX0Fh852zaJwm0XJTI=; b=Ihi1a2HUnWIW887QII9OGhc0vhOnrqXi6SFNDx7M+M6bq6pgPnddiNj1CKBRWiqGP3 Vff+3qcMUn/X9QGzg08hLvGBMTpJKI3r/7PRspwhSZlrEwgOoyL+lP27g8t88m/Z+Imn 5FKj6zdRjBPmftMq6EGuFNWMJaHRG0eB7DAOzHFhGvZr8Bqlbb8EI0I+8rM/fRm3ajaC xNE/TYdZY60+0Y5gFyAk7n0zJ4yA/m89vSM9tNCC91bZyukHmc0PpxeOchtXUE4qs7HS sO3eYK+s+/8IlfHSOmDmMUsA8SDvlIXg+df1H24XbSHemRJj46sNG/qyXCYnRswjmqkP pzgQ== X-Gm-Message-State: APjAAAWwL1IokT9ZqayC9ZJ7MavMbweQVJLyJmHWb9miTik3NTwNshXI eJUb3/aHeYZi2YzCCo4TbQbYzg== X-Google-Smtp-Source: APXvYqw9qsBqeLpU9VXKZE7xSR+Rn4Pb6ACIl8YZr7Z7kkn7BWAKZCCmi1CHGKn2mFl0p7G23Kx1yA== X-Received: by 2002:a5d:63d0:: with SMTP id c16mr34316425wrw.22.1565594824852; Mon, 12 Aug 2019 00:27:04 -0700 (PDT) Received: from piotrs-mbp.office.data-artisans.net (gw-dataartisans.bgr1-r1.de.syseleven.net. [37.44.7.170]) by smtp.gmail.com with ESMTPSA id z18sm9281614wml.10.2019.08.12.00.27.03 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 12 Aug 2019 00:27:04 -0700 (PDT) From: Piotr Nowojski Message-Id: <468064BE-40A0-49BC-B261-759E49CFEE65@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_D2998FD7-437A-4158-B0DE-702CF59E220A" Mime-Version: 1.0 (Mac OS X Mail 12.4 \(3445.104.11\)) Subject: Re: Kafka ProducerFencedException after checkpointing Date: Mon, 12 Aug 2019 09:27:03 +0200 In-Reply-To: Cc: Dongwon Kim , user , Keuntae Park To: Tony Wei , Becket Qin References: <8AF2D289-2848-4E4C-B091-6CE7A5440315@data-artisans.com> <6006CB2A-0F57-4DD4-8798-E78569EB6F1E@data-artisans.com> X-Mailer: Apple Mail (2.3445.104.11) --Apple-Mail=_D2998FD7-437A-4158-B0DE-702CF59E220A Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, Yes, if it=E2=80=99s due to transaction timeout you will lose the data. Whether can you fallback to at least once, that depends on Kafka, not on = Flink, since it=E2=80=99s the Kafka that timeouts those transactions and = I don=E2=80=99t see in the documentation anything that could override = this [1]. You might try disabling the mechanism via setting = `transaction.abort.timed.out.transaction.cleanup.interval.ms` or = `transaction.remove.expired.transaction.cleanup.interval.ms`, but = that=E2=80=99s question more to Kafka guys. Maybe Becket could help with = this. Also it MIGHT be that Kafka doesn=E2=80=99t remove records from the = topics when aborting the transaction and MAYBE you can still access them = via =E2=80=9CREAD_UNCOMMITTED=E2=80=9D mode. But that=E2=80=99s again, = question to Kafka.=20 Sorry that I can not help more. If you do not care about exactly once, why don=E2=80=99t you just set = the connector to at least once mode? Piotrek > On 12 Aug 2019, at 06:29, Tony Wei wrote: >=20 > Hi, >=20 > I had the same exception recently. I want to confirm that if it is due = to transaction timeout, > then I will lose those data. Am I right? Can I make it fall back to at = least once semantic in > this situation? >=20 > Best, > Tony Wei >=20 > Piotr Nowojski > =E6=96=BC 2018=E5=B9=B43=E6=9C=8821=E6=97= =A5 =E9=80=B1=E4=B8=89 =E4=B8=8B=E5=8D=8810:28=E5=AF=AB=E9=81=93=EF=BC=9A > Hi, >=20 > But that=E2=80=99s exactly the case: producer=E2=80=99s transaction = timeout starts when the external transaction starts - but = FlinkKafkaProducer011 keeps an active Kafka transaction for the whole = period between checkpoints. >=20 > As I wrote in the previous message: >=20 > > in case of failure, your timeout must also be able to cover the = additional downtime required for the successful job restart. Thus you = should increase your timeout accordingly. >=20 > I think that 15 minutes timeout is a way too small value. If your job = fails because of some intermittent failure (for example worker = crash/restart), you will only have a couple of minutes for a successful = Flink job restart. Otherwise you will lose some data (because of the = transaction timeouts). >=20 > Piotrek >=20 >> On 21 Mar 2018, at 10:30, Dongwon Kim > wrote: >>=20 >> Hi Piotr, >>=20 >> Now my streaming pipeline is working without retries.=20 >> I decreased Flink's checkpoint interval from 15min to 10min as you = suggested [see screenshot_10min_ckpt.png]. >>=20 >> I though that producer's transaction timeout starts when the external = transaction starts. >> The truth is that Producer's transaction timeout starts after the = last external checkpoint is committed. >> Now that I have 15min for Producer's transaction timeout and 10min = for Flink's checkpoint interval, and every checkpoint takes less than 5 = minutes, everything is working fine. >> Am I right? >>=20 >> Anyway thank you very much for the detailed explanation! >>=20 >> Best, >>=20 >> Dongwon >>=20 >>=20 >>=20 >> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski = > wrote: >> Hi, >>=20 >> Please increase transaction.timeout.ms = to a greater value or decrease = Flink=E2=80=99s checkpoint interval, I=E2=80=99m pretty sure the issue = here is that those two values are overlapping. I think that=E2=80=99s = even visible on the screenshots. First checkpoint completed started at = 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 = and ended at 14:49:16. That gives you minimal transaction duration of 15 = minutes and 10 seconds, with maximal transaction duration of 21 minutes. >>=20 >> In HAPPY SCENARIO (without any failure and restarting), you should = assume that your timeout interval should cover with some safety margin = the period between start of a checkpoint and end of the NEXT checkpoint, = since this is the upper bound how long the transaction might be used. In = your case at least ~25 minutes. >>=20 >> On top of that, as described in the docs, = https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors= /kafka.html#kafka-producers-and-fault-tolerance = , in case of failure, = your timeout must also be able to cover the additional downtime required = for the successful job restart. Thus you should increase your timeout = accordingly.=20 >>=20 >> Piotrek >>=20 >>=20 >>> On 20 Mar 2018, at 11:58, Dongwon Kim > wrote: >>>=20 >>> Hi Piotr, >>>=20 >>> We have set producer's [transaction.timeout.ms = ] to 15 minutes and have used the = default setting for broker (15 mins). >>> As Flink's checkpoint interval is 15 minutes, it is not a situation = where Kafka's timeout is smaller than Flink's checkpoint interval. >>> As our first checkpoint just takes 2 minutes, it seems like = transaction is not committed properly. >>>=20 >>> Best, >>>=20 >>> - Dongwon >>>=20 >>>=20 >>>=20 >>>=20 >>>=20 >>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski = > wrote: >>> Hi, >>>=20 >>> What=E2=80=99s your Kafka=E2=80=99s transaction timeout setting? = Please both check Kafka producer configuration (transaction.timeout.ms = property) and Kafka broker = configuration. The most likely cause of such error message is when = Kafka's timeout is smaller then Flink=E2=80=99s checkpoint interval and = transactions are not committed quickly enough before timeout occurring. >>>=20 >>> Piotrek >>>=20 >>>> On 17 Mar 2018, at 07:24, Dongwon Kim > wrote: >>>>=20 >>>>=20 >>>> Hi, >>>>=20 >>>> I'm faced with the following ProducerFencedException after 1st, = 3rd, 5th, 7th, ... checkpoints: >>>> -- >>>> java.lang.RuntimeException: Error while confirming checkpoint >>>> at = org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) >>>> at = java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at = java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:= 1149) >>>> at = java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java= :624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: = Producer attempted an operation with an old epoch. Either there is a = newer producer with the same transactionalId, or the producer's = transaction has been expired by the broker. >>>> -- >>>>=20 >>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once = processing using Kafka sink. >>>> We use FsStateBackend to store snapshot data on HDFS. >>>>=20 >>>> As shown in configuration.png, my checkpoint configuration is: >>>> - Checkpointing Mode : Exactly Once >>>> - Interval : 15m 0s >>>> - Timeout : 10m 0s >>>> - Minimum Pause Between Checkpoints : 5m 0s >>>> - Maximum Concurrent Checkpoints : 1 >>>> - Persist Checkpoints Externally : Disabled >>>>=20 >>>> After the first checkpoint completed [see history after 1st = ckpt.png], the job is restarted due to the ProducerFencedException [see = exception after 1st ckpt.png]. >>>> The first checkpoint takes less than 2 minutes while my checkpoint = interval is 15m and minimum pause between checkpoints is 5m. >>>> After the job is restarted, the second checkpoint is triggered = after a while [see history after 2nd ckpt.png] and this time I've got no = exception. >>>> The third checkpoint results in the same exception as after the = first checkpoint. >>>>=20 >>>> Can anybody let me know what's going wrong behind the scene? >>>>=20 >>>> Best, >>>>=20 >>>> Dongwon >>>> >>>=20 >>>=20 >>=20 >>=20 >> >=20 --Apple-Mail=_D2998FD7-437A-4158-B0DE-702CF59E220A Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

Yes, = if it=E2=80=99s due to transaction timeout you will lose the = data.

Whether = can you fallback to at least once, that depends on Kafka, not on Flink, = since it=E2=80=99s the Kafka that timeouts those transactions and I = don=E2=80=99t see in the documentation anything that could override this = [1]. You might try disabling the mechanism via setting = `transaction.abort.timed.out.transaction.cleanup.interval.ms` or = `transaction.remove.expired.transaction.cleanup.interval.ms`, but = that=E2=80=99s question more to Kafka guys. Maybe Becket could help with = this.

Also it = MIGHT be that Kafka doesn=E2=80=99t remove records from the topics when = aborting the transaction and MAYBE you can still access them via = =E2=80=9CREAD_UNCOMMITTED=E2=80=9D mode. But that=E2=80=99s again, = question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, = why don=E2=80=99t you just set the connector to at least once = mode?

Piotrek

On 12 Aug 2019, at 06:29, Tony = Wei <tony19920430@gmail.com> wrote:

Hi,

I had = the same exception recently. I want to confirm that if it is due to = transaction timeout,
then I will lose those data. = Am I right? Can I make it fall back to at least once semantic = in
this situation?

Best,
Tony = Wei

Piotr Nowojski <piotr@data-artisans.com> =E6=96=BC 2018=E5=B9=B43=E6=9C=88= 21=E6=97=A5 =E9=80=B1=E4=B8=89 =E4=B8=8B=E5=8D=8810:28=E5=AF=AB=E9=81=93=EF= =BC=9A
Hi,

But that=E2=80=99s exactly the case: producer=E2=80=99s = transaction timeout starts when the external transaction starts - but = FlinkKafkaProducer011 keeps an active Kafka transaction for the whole = period between checkpoints.

As I wrote in the previous message:

> in case of failure, your = timeout must also be able to cover the additional downtime required for = the successful job restart. Thus you should increase your timeout = accordingly.

I = think that 15 minutes timeout is a way too small value. If your job = fails because of some intermittent failure (for example worker = crash/restart), you will only have a couple of minutes for a successful = Flink job restart. Otherwise you will lose some data (because of the = transaction timeouts).

Piotrek

On 21 = Mar 2018, at 10:30, Dongwon Kim <eastcirclek@gmail.com> wrote:

Hi Piotr,

Now my streaming = pipeline is working without retries. 
I decreased Flink's = checkpoint interval from 15min to 10min as you suggested [see = screenshot_10min_ckpt.png].

I though that = producer's transaction timeout starts when the external transaction = starts.
The truth is that = Producer's transaction timeout starts after the last external checkpoint = is committed.
Now that I have 15min for Producer's transaction timeout and = 10min for Flink's checkpoint interval, and every checkpoint takes less = than 5 minutes, everything is working fine.
Am I right?

Anyway thank you very = much for the detailed explanation!

Best,

Dongwon


On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski = <piotr@data-artisans.com> wrote:
Hi,

Please increase transaction.timeout.ms to a greater value or decrease = Flink=E2=80=99s checkpoint interval, I=E2=80=99m pretty sure the issue = here is that those two values are overlapping. I think that=E2=80=99s = even visible on the screenshots. First checkpoint completed started at = 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 = and ended at 14:49:16. That gives you minimal transaction duration of 15 = minutes and 10 seconds, with maximal transaction duration of 21 = minutes.

In = HAPPY SCENARIO (without any failure and restarting), you should assume = that your timeout interval should cover with some safety margin the = period between start of a checkpoint and end of the NEXT checkpoint, = since this is the upper bound how long the transaction might be used. In = your case at least ~25 minutes.

On top of that, as described in the = docs, https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev= /connectors/kafka.html#kafka-producers-and-fault-tolerance , in = case of failure, your timeout must also be able to cover the additional = downtime required for the successful job restart. Thus you should = increase your timeout accordingly. 

Piotrek


On 20 = Mar 2018, at 11:58, Dongwon Kim <eastcirclek@gmail.com> wrote:

Hi Piotr,

We have set producer's = [transaction.timeout.ms] to 15 minutes and have used = the default setting for broker (15 mins).
As = Flink's checkpoint interval is 15 minutes, it is not a situation where = Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like = transaction is not committed properly.

Best,

- Dongwon





On Tue, = Mar 20, 2018 at 6:32 PM, Piotr Nowojski <piotr@data-artisans.com> = wrote:
Hi,

What=E2=80=99s your Kafka=E2=80=99s transaction timeout = setting? Please both check Kafka producer configuration (transaction.timeout.ms property) and Kafka broker = configuration. The most likely cause of such error message is when = Kafka's timeout is smaller then Flink=E2=80=99s checkpoint interval and = transactions are not committed quickly enough before timeout = occurring.

Piotrek

On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirclek@gmail.com> wrote:


Hi,

I'm = faced with the following ProducerFencedException after 1st, 3rd, 5th, = 7th, ... checkpoints:
--
java.lang.RuntimeException: Error while confirming checkpoint
	at =
org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
	at =
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at =
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:=
1149)
	at =
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java=
:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: =
Producer attempted an operation with an old epoch. Either there is a =
newer producer with the same transactionalId, or the producer's =
transaction has been expired by the broker.
--

FYI, I'm using Flink 1.4.0 and testing end-to-end exactly = once processing using Kafka sink.
We use FsStateBackend = to store snapshot data on HDFS.

As shown in configuration.png, my checkpoint = configuration is:
- Checkpointing Mode : Exactly = Once
- Interval : 15m 0s
- = Timeout : 10m 0s
- Minimum Pause Between Checkpoints : = 5m 0s
- Maximum Concurrent Checkpoints : = 1
- Persist Checkpoints Externally : = Disabled

After the first = checkpoint completed [see history after 1st ckpt.png], the job is = restarted due to the ProducerFencedException [see exception after 1st = ckpt.png].
The first checkpoint takes less than 2 minutes = while my checkpoint interval is 15m and minimum pause between = checkpoints is 5m.
After the job is = restarted, the second checkpoint is triggered after a while [see history = after 2nd ckpt.png] and this time I've got no = exception.
The third checkpoint results in the same = exception as after the first checkpoint.

Can anybody let me know what's going wrong behind = the scene?

Best,

Dongwon
<history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd ckpt.png><configuration.png><summary.png><exception after 1st ckpt.png><history after 1st = ckpt.png>




<screenshot_10min_ckpt.png>


= --Apple-Mail=_D2998FD7-437A-4158-B0DE-702CF59E220A--