From user-return-28840-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jul 24 13:20:28 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id EEA1818025F for ; Wed, 24 Jul 2019 15:20:27 +0200 (CEST) Received: (qmail 56735 invoked by uid 500); 24 Jul 2019 13:20:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 56719 invoked by uid 99); 24 Jul 2019 13:20:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jul 2019 13:20:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8383F1803AF for ; Wed, 24 Jul 2019 13:20:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.251 X-Spam-Level: ** X-Spam-Status: No, score=2.251 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=sentinelone-com.20150623.gappssmtp.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id YxPaezGkBL1r for ; Wed, 24 Jul 2019 13:20:23 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::343; helo=mail-ot1-x343.google.com; envelope-from=yitzchakl@sentinelone.com; receiver= Received: from mail-ot1-x343.google.com (mail-ot1-x343.google.com [IPv6:2607:f8b0:4864:20::343]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 07DF17DC5D for ; Wed, 24 Jul 2019 13:20:22 +0000 (UTC) Received: by mail-ot1-x343.google.com with SMTP id z23so19457840ote.13 for ; Wed, 24 Jul 2019 06:20:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=sentinelone-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=nKz6HMAYW507B4SBbs3E9XH0TdSsXdTzvGZu4B1Fqd4=; b=aAOXHQfa8TSfLX8KKcvJG86H/7z+IiT8E9XSuz1qWcdRIw/VOW9XZZLs9unri0ULq3 535G1ffJvVAKFMAB5OsegZBgmHKlixxC8SwiM67UgWsPvpR8nF2zyeyIAWN5TUbAxfHq Ypz6RbHPyukaRZde3YrQc08odPfvQKMcpKoCLea/2eu6Iv2SXmrm5aePxtUoI/efhHcR O9QSp6tjrA5xUwdNvPHYm5er6RMjmRGEymhCTY0t01H32a7POwpsSJml/z4kL9RwjsIh 4jjOTHGHPw4ehLIhFEgWCu/HEYaOzcYeb8i/OzVRAYvZ+qOdwgBpNlikr+fuHjXJnGco Qd0g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=nKz6HMAYW507B4SBbs3E9XH0TdSsXdTzvGZu4B1Fqd4=; b=BeC/xaJjRGwevvT6xMF4S33wUkZVPhcnMYXim+P8P01eFyn3TrqLbVmf3VAmaimIlu JHLdZHq5cOm0QT5qb3ko+bj+R2sgD+eLJYS9PWlyai7Ohm/R5di8O6rDfSaI8YKd2Xfc qzaw4OOg3UvDSLsG7BDpovR/NPCHNcY/QqRCJxyg1MPL2M2eVJc1UT7GXhaSLiTIaRf+ vcsOTF/n3LY64yuW9ezxmG40qkmWQHXYOCc+/K6fBJ0ci2b6naLjb5PewnspU17cQoG8 3rB3dtFb28YVtAtd8wY6UoaxHqlIYSwuaMEovo2F3uk4Nq/qXGqKOdLrpMiBnBpaQBcW zmcw== X-Gm-Message-State: APjAAAUf/zJNX2TUmfPGaB+47+PMWtq8RilwwN+8u2nHq1zLlImdfbKF iZB/BMEwfcbzp5aVvedVeD+y9kgtVZEn2rZGzni62Q== X-Google-Smtp-Source: APXvYqxvdYPlP+PkVkUjp7AL9bFtibF50XpGcYWXAVMd18SBQp8Xbn3n/Irm7YVvYdXq0MSZkZV6VOBi9G0hkW77Oho= X-Received: by 2002:a9d:6742:: with SMTP id w2mr7054691otm.371.1563974416051; Wed, 24 Jul 2019 06:20:16 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Yitzchak Lieberman Date: Wed, 24 Jul 2019 16:20:05 +0300 Message-ID: Subject: Re: timeout exception when consuming from kafka To: Fabian Hueske Cc: user , "Tzu-Li (Gordon) Tai" Content-Type: multipart/alternative; boundary="00000000000081c2a7058e6d2b81" --00000000000081c2a7058e6d2b81 Content-Type: text/plain; charset="UTF-8" Hi. Do we have an idea for this exception? Thanks, Yitzchak. On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske wrote: > Hi Yitzchak, > > Thanks for reaching out. > I'm not an expert on the Kafka consumer, but I think the number of > partitions and the number of source tasks might be interesting to know. > > Maybe Gordon (in CC) has an idea of what's going wrong here. > > Best, Fabian > > Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman < > yitzchakl@sentinelone.com>: > >> Hi. >> >> Another question - what will happen during a triggered checkpoint if one >> of the kafka brokers is unavailable? >> Will appreciate your insights. >> >> Thanks. >> >> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman < >> yitzchakl@sentinelone.com> wrote: >> >>> Hi. >>> >>> I'm running a Flink application (version 1.8.0) that >>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on >>> the data, with state backend as below: >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE); >>> env.setStateBackend((StateBackend) new >>> FsStateBackend("file:///test")); >>> env.getCheckpointConfig().setCheckpointTimeout(30_000); >>> >>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); >>> >>> My problem is with the kafka brokers, where in the cluster there are 3 >>> operating brokers and 2 are down - total 5 brokers. >>> I was able to consume the data, but when the checkpoint triggered it >>> throws this exception: >>> >>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18] >>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task >>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1. >>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28] >>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution >>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to >>> Std. Out 457b1f801fee89d6f9544409877e29d8. >>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18] >>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job >>> 1c46aa5719bac1f0bea436d460b79db1. >>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: >>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running >>> at >>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198) >>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>> at >>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700) >>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?] >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> ~[?:1.8.0_201] >>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274) >>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189) >>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) >>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>> at >>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.actor.Actor$class.aroundReceive(Actor.scala) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>> ~[akka-actor_2.11-2.4.20.jar:?] >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> [scala-library-2.11.12.jar:?] >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> [scala-library-2.11.12.jar:?] >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> [scala-library-2.11.12.jar:?] >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> [scala-library-2.11.12.jar:?] >>> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28] >>> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out >>> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED. >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>> fetching topic metadata >>> >>> My question is (as I think what does the checkpoint tries to do) why is >>> it trying to fetch topic metadata from the brokers that are down? >>> >>> Thanks, >>> Yitzchak. >>> >> --00000000000081c2a7058e6d2b81 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi.

Do we have an idea for this excepti= on?

Thanks,
Yitzchak.

On Tue, Jul 2= 3, 2019 at 12:59 PM Fabian Hueske <= fhueske@gmail.com> wrote:
Hi Yitzchak,

Thanks for reaching out.
I'm not an expert on the Kafka con= sumer, but I think the number of partitions and the number of source tasks = might be interesting to know.

Maybe Gordon (in CC)= has an idea of what's going wrong here.

Best,= Fabian

Am Di., 23. Juli 2019 um 08:50=C2=A0Uhr schrieb Yitzchak L= ieberman <yitzchakl@sentinelone.com>:
Hi.
Another question - what will happen during a triggered checkpoi= nt if one of the kafka brokers is unavailable?
Will appreciate yo= ur insights.

Thanks.=C2=A0

On Mon, Jul 22, 2019 at = 12:42 PM Yitzchak Lieberman <yitzchakl@sentinelone.com> wrote:
Hi.
<= br>
I'm running a Flink application (version 1.8.0) that uses= =C2=A0FlinkKafkaConsumer to fetch topic data and perform transformation on = the data, with state backend as below:
StreamExecutionEnvironment= env =3D StreamExecutionEnvironment.getExecutionEnvironment();
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 env.enableCheckpointing(5_000, CheckpointingMode.AT_LE= AST_ONCE);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.setStateBackend((StateBackend= ) new FsStateBackend("file:///test"));
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 env.getCheckpointConfig().setCheckpointTimeout(30_000);
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 env.getCheckpointConfig().enableExternalizedCheckpoint= s(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.getCheckpointConfig().setMaxConcurrentChe= ckpoints(1);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 env.getCheckpointConfig().setMi= nPauseBetweenCheckpoints(1000);

My problem is = with the kafka brokers, where in the cluster there are 3 operating brokers = and 2 are down - total 5 brokers.
I was able to consume the data,= but when the checkpoint triggered it throws this exception:

=
[INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispat= cher-18] o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task 457= b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
[= INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28] o.a= .f.r.t.TaskExecutor - Un-registering task and sending final execution state= FAILED to JobManager for task Source: Custom Source -> Sink: Print to S= td. Out 457b1f801fee89d6f9544409877e29d8.
[INFO ] 2019-07-22 12:29:14.63= 4 [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator = - Discarding checkpoint 6 of job 1c46aa5719bac1f0bea436d460b79db1.
org.a= pache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyExcepti= on: Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not = running
at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointB= arrier(Task.java:1198) ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at org.apa= che.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.= java:700) ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at sun.reflect.Generate= dMethodAccessor15.invoke(Unknown Source) ~[?:?]
at sun.reflect.Delegati= ngMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0= _201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201= ]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation= (AkkaRpcActor.java:274) ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at org.ap= ache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java= :189) ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at org.apache.flink.runtime= .rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) ~[flink-runtime_2.1= 1-1.8.0.jar:1.8.0]
at akka.actor.UntypedActor$$anonfun$receive$1.applyO= rElse(UntypedActor.scala:165) ~[akka-actor_2.11-2.4.20.jar:?]
at akka.a= ctor.Actor$class.aroundReceive$$$capture(Actor.scala:502) ~[akka-actor_2.11= -2.4.20.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala) ~[a= kka-actor_2.11-2.4.20.jar:?]
at akka.actor.UntypedActor.aroundReceive(U= ntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.Act= orCell.receiveMessage(ActorCell.scala:526) ~[akka-actor_2.11-2.4.20.jar:?]<= br> at akka.actor.ActorCell.invoke(ActorCell.scala:495) ~[akka-actor_2.11-2= .4.20.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)= ~[akka-actor_2.11-2.4.20.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.s= cala:224) ~[akka-actor_2.11-2.4.20.jar:?]
at akka.dispatch.Mailbox.exec= (Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:?]
at scala.concurrent= .forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.12= .jar:?]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For= kJoinPool.java:1339) [scala-library-2.11.12.jar:?]
at scala.concurrent.= forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11= .12.jar:?]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJo= inWorkerThread.java:107) [scala-library-2.11.12.jar:?]
[INFO ] 2019-07-2= 2 12:29:14.637 [flink-akka.actor.default-dispatcher-28] o.a.f.r.e.Execution= Graph - Source: Custom Source -> Sink: Print to Std. Out (2/4) (457b1f80= 1fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
org.apache.ka= fka.common.errors.TimeoutException: Timeout expired while fetching topic me= tadata

My question is (as I think what does th= e checkpoint tries to do) why is it trying to fetch topic metadata from the= brokers that are down?

Thanks,
Yitzchak= .
--00000000000081c2a7058e6d2b81--