Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40737200AF7 for ; Tue, 14 Jun 2016 14:23:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3F451160A47; Tue, 14 Jun 2016 12:23:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F3C31602C5 for ; Tue, 14 Jun 2016 14:23:56 +0200 (CEST) Received: (qmail 62089 invoked by uid 500); 14 Jun 2016 12:23:55 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 62073 invoked by uid 99); 14 Jun 2016 12:23:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 12:23:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 009A61A02AA for ; Tue, 14 Jun 2016 12:23:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.72 X-Spam-Level: X-Spam-Status: No, score=-0.72 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ySq1K1PbFgsJ for ; Tue, 14 Jun 2016 12:23:52 +0000 (UTC) Received: from mail-wm0-f48.google.com (mail-wm0-f48.google.com [74.125.82.48]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id CC6FE5F39C for ; Tue, 14 Jun 2016 12:23:51 +0000 (UTC) Received: by mail-wm0-f48.google.com with SMTP id v199so118740457wmv.0 for ; Tue, 14 Jun 2016 05:23:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=rfDttwk/u0eTZioKWWnGSOJYs5ZZQHhbA1brOYcuxzk=; b=KAy5rTtYhYfaBD7pYg1wCK2J3AbFLJjiJrLqLG61EUU2gfKwzytZQO8Qf/C5YOGOa0 hE+RkJbmnu8wZ9Ng/EMOdF879Hw3kmu5Gi/nUhLuY6vmtPiChq5HqqmXj8PRJgyRFWmO ZymjEA4mRsGTaOneXB/IT6dJD7zdasbPFiVWK9DhRfrPAgFVhT+jOkeHkv4xPIsG90Aa v5t7NJUsCM1huU7C7eATPNPcngMX88CGJKStyBFkMHPxPwRoDM3DuVnIOQESPv3Ryfi1 1uk+Y0BBUowINPcgc6HboALGVwC3s5u5fY1p7FMC7nwPTGgWgSiLaW37+mewwxa/nwXm VKfg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=rfDttwk/u0eTZioKWWnGSOJYs5ZZQHhbA1brOYcuxzk=; b=VUkFyI8WiwSVyxd7AvG41vzXLw/+jVZM1Pf58+1YqdlotB+nH5GNw7ePg6A3uSF7te mwAJ149KbGl4FsEGzTCkBoH+JNJnERLHrzHROM9djUhgHQ94tBAW8yTGWg9zO2IPCI7l FEH7Q4fi3JH/DdnGnMRz7kFVJ+CSCqDVA3z5ydyMvNtZxLiBANA6EWQScs49Q1WICDCX ycyBZJ8Z0Q8kPCGGUDCPq9ElcfWQ7DKL2q0ue27MI0se9uhZ9xRaa5oCo3Y48sMsfuTR UXS/uV/qW7lU2BnoXR4H514nIlgJ/+d3mhuSp3azlhOulnYsTiI250lka26XQQqb2g5l 7MNg== X-Gm-Message-State: ALyK8tKgVCtxpHeBSaG8UBTwiX+zntRESuH2tP/bpybPRO8Q5V0fqDmo8Vx208H7j2hX2VGV X-Received: by 10.28.142.144 with SMTP id q138mr6226607wmd.30.1465907030206; Tue, 14 Jun 2016 05:23:50 -0700 (PDT) Received: from macklou.fritz.box (ip5b40315a.dynamic.kabel-deutschland.de. [91.64.49.90]) by smtp.gmail.com with ESMTPSA id z1sm29303041wju.32.2016.06.14.05.23.49 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Tue, 14 Jun 2016 05:23:49 -0700 (PDT) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: Checkpoint takes long with FlinkKafkaConsumer From: Kostas Kloudas In-Reply-To: Date: Tue, 14 Jun 2016 14:23:45 +0200 Content-Transfer-Encoding: quoted-printable Message-Id: <055BE947-709F-48C1-B935-D1BC978FF0C4@data-artisans.com> References: To: user@flink.apache.org X-Mailer: Apple Mail (2.3124) archived-at: Tue, 14 Jun 2016 12:23:57 -0000 Hi Hironori, Could you also provide the logs of the taskManager? As you described, it seems that the consumer is stuck in the polling = loop, although Flink polls with a timeout. This would normally mean that periodically it should release = the lock for the checkpoints to go through. The logs of the task manager can help at clarifying why this does not = happen. Thanks, Kostas > On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi = wrote: >=20 > Kostas, >=20 > Thank you for your response. > Yes, I am using latest Flink, which is 1.0.3. >=20 > Thanks, > Hironori >=20 > 2016-06-14 19:02 GMT+09:00 Kostas Kloudas = : >> Hello Hironori, >>=20 >> Are you using the latest Flink version? >> There were some changes in the FlinkConsumer in the latest releases. >>=20 >> Thanks, >> Kostas >>=20 >>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi = wrote: >>>=20 >>> Hello, >>>=20 >>> I am running Flink job which reads topics from Kafka and write = results >>> to Redis. I use FsStatebackend with HDFS. >>>=20 >>> I noticed that taking checkpoint takes serveral minutes and = sometimes expires. >>> --- >>> 2016-06-14 17:25:40,734 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Completed checkpoint 1456 (in 257956 ms) >>> 2016-06-14 17:25:40,735 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Triggering checkpoint 1457 @ 1465892740734 >>> 2016-06-14 17:35:40,735 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Checkpoint 1457 expired before completing. >>> 2016-06-14 17:35:40,736 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Triggering checkpoint 1458 @ 1465893340735 >>> 2016-06-14 17:45:40,736 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Checkpoint 1458 expired before completing. >>> 2016-06-14 17:45:40,737 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Triggering checkpoint 1459 @ 1465893940736 >>> 2016-06-14 17:55:40,738 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Checkpoint 1459 expired before completing. >>> 2016-06-14 17:55:40,739 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>> Triggering checkpoint 1460 @ 1465894540738 >>> --- >>>=20 >>> According to WebUI, checkpoint size is just 1MB. Why checkpointing >>> takes so long? >>>=20 >>> I took jstack during checkpointing. It looks that checkpointing = thread >>> is blocked in commitOffsets. >>>=20 >>> ---- >>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon >>> prio=3D10 tid=3D0x00007f2b14010800 nid=3D0x1b89a waiting for monitor = entry >>> [0x00007f2b3ddfc000] >>> java.lang.Thread.State: BLOCKED (on object monitor) >>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOff= sets(FlinkKafkaConsumer09.java:392) >>> - waiting to lock <0x0000000659111b58> (a >>> org.apache.kafka.clients.consumer.KafkaConsumer) >>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyC= heckpointComplete(FlinkKafkaConsumerBase.java:169) >>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyO= fCompletedCheckpoint(AbstractUdfStreamOperator.java:179) >>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComple= te(StreamTask.java:596) >>> - locked <0x0000000659111cc8> (a java.lang.Object) >>> at = org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945) >>> at = java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> at = java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:= 1145) >>> at = java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java= :615) >>> at java.lang.Thread.run(Thread.java:745) >>> --- >>>=20 >>> Blocker is this. >>>=20 >>> --- >>> "Thread-9" daemon prio=3D10 tid=3D0x00007f2b2440d000 nid=3D0x1b838 = runnable >>> [0x00007f2b3dbfa000] >>> java.lang.Thread.State: RUNNABLE >>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>> at = sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >>> at = sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) >>> at = sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) >>> - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2) >>> - locked <0x0000000659457db8> (a = java.util.Collections$UnmodifiableSet) >>> - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl) >>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) >>> at = org.apache.kafka.common.network.Selector.select(Selector.java:425) >>> at = org.apache.kafka.common.network.Selector.poll(Selector.java:254) >>> at = org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) >>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPo= ll(ConsumerNetworkClient.java:320) >>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:213) >>> at = org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Con= sumerNetworkClient.java:193) >>> at = org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.jav= a:908) >>> at = org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:85= 3) >>> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerT= hread.run(FlinkKafkaConsumer09.java:449) >>> - locked <0x0000000659111b58> (a >>> org.apache.kafka.clients.consumer.KafkaConsumer) >>> --- >>>=20 >>> If someone could advise me of the cause or the way to investigate >>> further, that would be appreciated. >>>=20 >>> Thanks, >>> Hironori >>=20