Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4D94A200AF7 for ; Tue, 14 Jun 2016 15:40:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4C554160A47; Tue, 14 Jun 2016 13:40:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1DDF0160A06 for ; Tue, 14 Jun 2016 15:40:51 +0200 (CEST) Received: (qmail 79045 invoked by uid 500); 14 Jun 2016 13:40:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 79034 invoked by uid 99); 14 Jun 2016 13:40:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 13:40:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id AA9D018053F for ; Tue, 14 Jun 2016 13:40:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id CyGO4mpEM4a9 for ; Tue, 14 Jun 2016 13:40:47 +0000 (UTC) Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id D5E245F3A1 for ; Tue, 14 Jun 2016 13:40:46 +0000 (UTC) Received: by mail-wm0-f46.google.com with SMTP id m124so123723850wme.1 for ; Tue, 14 Jun 2016 06:40:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:references:to:in-reply-to; bh=y6EW6vToQo8mmn3cSVUrC8XytPPKlgwIXprdoLEAVR0=; b=ibR3R3b+07w5tySEiOMXsNVutnxd2B3blwfNh7vd9UIaY+9rJA4KuHON6uqSoeOaO4 jOVppFownaglt2/pQRWad0ZB49JkctpNa4jr19qMaScplVTqSATmg5Cz8javrHSD7C7x EIixuzTFZXQeo6aYE8fePdVEyW2fxHCIsCCsxT9oae+6USEjEWdUcz4r3tG8uhoVNG0o 1+CRBuB0BKOpIDm8me81CW+d8I6v3zbiUNBcKCG3dcxXy76/BDETABhm1Ilzi+Us25CD VEpkg/l/ygaHQYndzOR/HR0TJUxB+7pyXoj27hq+ysQaTzh6iVM5VgO4ujAflP0o6mnG 4aEA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:message-id:mime-version:subject:date :references:to:in-reply-to; bh=y6EW6vToQo8mmn3cSVUrC8XytPPKlgwIXprdoLEAVR0=; b=HzkoX9x6PpMAQEPE2Ekd86/DotXvL4EKXkh+FRLxeAOv7itwbRSIBz2qzHu3pcPMTW SykiFs8J+g09CDYmMGD0RMJ1QJZUyBjNWp+rcJgcQZ2YLKxSoBH3un2IJibMg8GkKXOH y/l3GNxb47XpvUUum7uOpD+K9n456NmRlbAiPYax8XV39MuhYcgynpPJMusB/+Mj/G+5 /Ml6eOLjVXVgIdoRuH8il/caS3CI3b9jZmx2iUq236MsCEkKZKVWOZVftXCoKvDpUDAz ZQpQxg1AnbTm7Ewq70O77fVTnsZX59/v8sIXLRxj/q+VLr+gQCUCqFJFmF1rd4I4PK0N j7Yw== X-Gm-Message-State: ALyK8tKRBYIA71WnWaIst0klnvebSrIWMwOX63H6mA+cJ5XkByLxwa6/a2GXQTXa+voFbJUb X-Received: by 10.195.11.163 with SMTP id ej3mr988034wjd.130.1465911646306; Tue, 14 Jun 2016 06:40:46 -0700 (PDT) Received: from macklou.fritz.box (ip5b40315a.dynamic.kabel-deutschland.de. [91.64.49.90]) by smtp.gmail.com with ESMTPSA id lr9sm33060832wjb.39.2016.06.14.06.40.39 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 14 Jun 2016 06:40:45 -0700 (PDT) From: Kostas Kloudas Content-Type: multipart/alternative; boundary="Apple-Mail=_A6642E70-2745-46E2-9AE3-38AB3379C8CE" Message-Id: Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: Checkpoint takes long with FlinkKafkaConsumer Date: Tue, 14 Jun 2016 15:40:17 +0200 References: <055BE947-709F-48C1-B935-D1BC978FF0C4@data-artisans.com> To: user@flink.apache.org In-Reply-To: X-Mailer: Apple Mail (2.3124) archived-at: Tue, 14 Jun 2016 13:40:53 -0000 --Apple-Mail=_A6642E70-2745-46E2-9AE3-38AB3379C8CE Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=us-ascii Hello Hironori, The logs just show that you get stuck in the Kafka consumer polling = loop,=20 which does not allow the consumer lock to be released. Thus the Flink part of the consumer is never actually called. To my understanding this does not seem to be a Flink issue. Or at least this is not shown from the logs. =46rom googling a bit, I found this: = http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coo= rdinator-2147483647-dead = which relates the problem to network issues.=20 Have you tried posting the problem also to the Kafka mailing list? Can it be that the kafka broker fails and tries to reconnect but does = not=20 make it? Kostas > On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi = wrote: >=20 > Kostas, >=20 > I have attached a log file from one of the taskManager. (The same host > I executed jstack) > I noticed that there are lots of "Marking the coordinator 2147482645 > dead" message in the log. > MyContinuousProcessingTimeTriggerGlobal in the log is my custom > trigger which is based on > ContinuousProcessingTimeTrigger but clean up windows when it received > specific log records. >=20 > Thanks, > Hironori >=20 > 2016-06-14 21:23 GMT+09:00 Kostas Kloudas = : >> Hi Hironori, >>=20 >> Could you also provide the logs of the taskManager? >>=20 >> As you described, it seems that the consumer is stuck in the polling = loop, although Flink polls with >> a timeout. This would normally mean that periodically it should = release the lock for the checkpoints to go through. >>=20 >> The logs of the task manager can help at clarifying why this does not = happen. >>=20 >> Thanks, >> Kostas >>=20 >>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi = wrote: >>>=20 >>> Kostas, >>>=20 >>> Thank you for your response. >>> Yes, I am using latest Flink, which is 1.0.3. >>>=20 >>> Thanks, >>> Hironori >>>=20 >>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas = : >>>> Hello Hironori, >>>>=20 >>>> Are you using the latest Flink version? >>>> There were some changes in the FlinkConsumer in the latest = releases. >>>>=20 >>>> Thanks, >>>> Kostas >>>>=20 >>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi = wrote: >>>>>=20 >>>>> Hello, >>>>>=20 >>>>> I am running Flink job which reads topics from Kafka and write = results >>>>> to Redis. I use FsStatebackend with HDFS. >>>>>=20 >>>>> I noticed that taking checkpoint takes serveral minutes and = sometimes expires. >>>>> --- >>>>> 2016-06-14 17:25:40,734 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Completed checkpoint 1456 (in 257956 ms) >>>>> 2016-06-14 17:25:40,735 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1457 @ 1465892740734 >>>>> 2016-06-14 17:35:40,735 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1457 expired before completing. >>>>> 2016-06-14 17:35:40,736 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1458 @ 1465893340735 >>>>> 2016-06-14 17:45:40,736 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1458 expired before completing. >>>>> 2016-06-14 17:45:40,737 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1459 @ 1465893940736 >>>>> 2016-06-14 17:55:40,738 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1459 expired before completing. >>>>> 2016-06-14 17:55:40,739 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1460 @ 1465894540738 >>>>> --- >>>>>=20 >>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing >>>>> takes so long? >>>>>=20 >>>>> I took jstack during checkpointing. It looks that checkpointing = thread >>>>> is blocked in commitOffsets. >>>>>=20 >>>>> ---- >>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon >>>>> prio=3D10 tid=3D0x00007f2b14010800 nid=3D0x1b89a waiting for = monitor entry >>>>> [0x00007f2b3ddfc000] >>>>> java.lang.Thread.State: BLOCKED (on object monitor) >>>>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOff= sets(FlinkKafkaConsumer09.java:392) >>>>> - waiting to lock <0x0000000659111b58> (a >>>>> org.apache.kafka.clients.consumer.KafkaConsumer) >>>>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyC= heckpointComplete(FlinkKafkaConsumerBase.java:169) >>>>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyO= fCompletedCheckpoint(AbstractUdfStreamOperator.java:179) >>>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComple= te(StreamTask.java:596) >>>>> - locked <0x0000000659111cc8> (a java.lang.Object) >>>>> at = org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945) >>>>> at = java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>>>> at = java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:= 1145) >>>>> at = java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java= :615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> --- >>>>>=20 >>>>> Blocker is this. >>>>>=20 >>>>> --- >>>>> "Thread-9" daemon prio=3D10 tid=3D0x00007f2b2440d000 nid=3D0x1b838 = runnable >>>>> [0x00007f2b3dbfa000] >>>>> java.lang.Thread.State: RUNNABLE >>>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>>> at = sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >>>>> at = sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) >>>>> at = sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) >>>>> - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2) >>>>> - locked <0x0000000659457db8> (a = java.util.Collections$UnmodifiableSet) >>>>> - locked <0x0000000659457108> (a = sun.nio.ch.EPollSelectorImpl) >>>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) >>>>> at = org.apache.kafka.common.network.Selector.select(Selector.java:425) >>>>> at = org.apache.kafka.common.network.Selector.poll(Selector.java:254) >>>>> at = org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) >>>>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPo= ll(ConsumerNetworkClient.java:320) >>>>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:213) >>>>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:193) >>>>> at = org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.jav= a:908) >>>>> at = org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:85= 3) >>>>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerT= hread.run(FlinkKafkaConsumer09.java:449) >>>>> - locked <0x0000000659111b58> (a >>>>> org.apache.kafka.clients.consumer.KafkaConsumer) >>>>> --- >>>>>=20 >>>>> If someone could advise me of the cause or the way to investigate >>>>> further, that would be appreciated. >>>>>=20 >>>>> Thanks, >>>>> Hironori >>>>=20 >>=20 > --Apple-Mail=_A6642E70-2745-46E2-9AE3-38AB3379C8CE Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=us-ascii Hello Hironori,

The logs just show that you get stuck in the Kafka consumer = polling loop, 
which does not allow the = consumer lock to be released. Thus the Flink
part = of the consumer is never actually called.

To my understanding this does not seem = to be a Flink issue.
Or at least this is not shown = from the logs.

=46rom googling a bit, I found this:


which relates the problem to network = issues. 

Have you tried posting the problem also to the Kafka mailing = list?
Can it be that the kafka broker fails and = tries to reconnect but does not 
make = it?

Kostas

On = Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:

Kostas,

I have attached a log = file from one of the taskManager. (The same host
I = executed jstack)
I noticed that there are lots of "Marking = the coordinator 2147482645
dead" message in the log.
MyContinuousProcessingTimeTriggerGlobal in the log is my = custom
trigger which is based on
ContinuousProcessingTimeTrigger but clean up windows when it = received
specific log records.

Thanks,
Hironori

2016-06-14 21:23 GMT+09:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
Hi Hironori,

Could = you also provide the logs of the taskManager?

As you described, it seems that the consumer is stuck in the = polling loop, although Flink polls with
a timeout. This = would normally mean that periodically it should release the lock for the = checkpoints to go through.

The logs of the = task manager can help at clarifying why this does not happen.

Thanks,
Kostas

On Jun 14, 2016, at = 12:48 PM, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:

Kostas,

Thank you for your = response.
Yes, I am using latest Flink, which is 1.0.3.

Thanks,
Hironori
2016-06-14 19:02 GMT+09:00 Kostas Kloudas <k.kloudas@data-artisans.com>:
Hello Hironori,

Are = you using the latest Flink version?
There were some = changes in the FlinkConsumer in the latest releases.

Thanks,
Kostas

On Jun 14, 2016, at = 11:52 AM, Hironori Ogibayashi <ogibayashi@gmail.com> wrote:

Hello,

I am running Flink job = which reads topics from Kafka and write results
to Redis. = I use FsStatebackend with HDFS.

I noticed = that taking checkpoint takes serveral minutes and sometimes expires.
---
2016-06-14 17:25:40,734 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Completed checkpoint 1456 (in = 257956 ms)
2016-06-14 17:25:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Triggering checkpoint 1457 @ = 1465892740734
2016-06-14 17:35:40,735 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Checkpoint 1457 expired before = completing.
2016-06-14 17:35:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Triggering checkpoint 1458 @ = 1465893340735
2016-06-14 17:45:40,736 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Checkpoint 1458 expired before = completing.
2016-06-14 17:45:40,737 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Triggering checkpoint 1459 @ = 1465893940736
2016-06-14 17:55:40,738 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Checkpoint 1459 expired before = completing.
2016-06-14 17:55:40,739 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator =     -
Triggering checkpoint 1460 @ = 1465894540738
---

According = to WebUI, checkpoint size is just 1MB. Why checkpointing
takes so long?

I took jstack = during checkpointing. It looks that checkpointing thread
is = blocked in commitOffsets.

----
"Async calls on Source: Custom Source -> Flat Map (2/3)" = daemon
prio=3D10 tid=3D0x00007f2b14010800 nid=3D0x1b89a = waiting for monitor entry
[0x00007f2b3ddfc000]
java.lang.Thread.State: BLOCKED (on object monitor)
     at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOff= sets(FlinkKafkaConsumer09.java:392)
=      - waiting to lock = <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
=      at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyC= heckpointComplete(FlinkKafkaConsumerBase.java:169)
=      at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyO= fCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
=      at = org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComple= te(StreamTask.java:596)
     - = locked <0x0000000659111cc8> (a java.lang.Object)
=      at = org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
     at = java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)      at = java.util.concurrent.FutureTask.run(FutureTask.java:262)
=      at = java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:= 1145)
     at = java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java= :615)
     at = java.lang.Thread.run(Thread.java:745)
---

Blocker is this.

---
"Thread-9" daemon prio=3D10 tid=3D0x00007f2b2440d000 = nid=3D0x1b838 runnable
[0x00007f2b3dbfa000]
= java.lang.Thread.State: RUNNABLE
=      at = sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
=      at = sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
     at = sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
     at = sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
     - locked = <0x0000000659457dc8> (a sun.nio.ch.Util$2)
=      - locked <0x0000000659457db8> (a = java.util.Collections$UnmodifiableSet)
=      - locked <0x0000000659457108> (a = sun.nio.ch.EPollSelectorImpl)
=      at = sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
=      at = org.apache.kafka.common.network.Selector.select(Selector.java:425)
     at = org.apache.kafka.common.network.Selector.poll(Selector.java:254)
     at = org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
     at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPo= ll(ConsumerNetworkClient.java:320)
=      at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:213)
=      at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:193)
=      at = org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.jav= a:908)
     at = org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:85= 3)
     at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerT= hread.run(FlinkKafkaConsumer09.java:449)
=      - locked <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

If someone could advise me = of the cause or the way to investigate
further, that would = be appreciated.

Thanks,
Hironori


<flink-flink-= taskmanager-0-FLINK1503.log.gz>
= --Apple-Mail=_A6642E70-2745-46E2-9AE3-38AB3379C8CE--