Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4B258200B96 for ; Thu, 6 Oct 2016 17:30:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 49860160ADB; Thu, 6 Oct 2016 15:30:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F144160AC5 for ; Thu, 6 Oct 2016 17:30:11 +0200 (CEST) Received: (qmail 46459 invoked by uid 500); 6 Oct 2016 15:30:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 46449 invoked by uid 99); 6 Oct 2016 15:30:10 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 15:30:10 +0000 Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 17D371A0046 for ; Thu, 6 Oct 2016 15:30:10 +0000 (UTC) Received: by mail-it0-f50.google.com with SMTP id o19so33122231ito.1 for ; Thu, 06 Oct 2016 08:30:09 -0700 (PDT) X-Gm-Message-State: AA6/9Rl/o6ahr6VXm4u4zhlT/609hRQ+hzOqWG7X719XW0Yt3tvl98qGeupQynSL0UUCdKIamcqGtCTNeVXFvw== X-Received: by 10.36.98.72 with SMTP id d69mr18547118itc.118.1475767808833; Thu, 06 Oct 2016 08:30:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.13.140 with HTTP; Thu, 6 Oct 2016 08:30:08 -0700 (PDT) In-Reply-To: <1475553079759-9300.post@n4.nabble.com> References: <1470680686177-8375.post@n4.nabble.com> <1475553079759-9300.post@n4.nabble.com> From: Stephan Ewen Date: Thu, 6 Oct 2016 17:30:08 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Flink Kafka Consumer Behaviour To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113f6ade044786053e33f821 archived-at: Thu, 06 Oct 2016 15:30:12 -0000 --001a113f6ade044786053e33f821 Content-Type: text/plain; charset=UTF-8 Hi! There was an issue in the Kafka 0.9 consumer in Flink concerning checkpoints. It was relevant mostly for lower-throughput topics / partitions. It is fixed in the 1.1.3 release. Can you try out the release candidate and see if that solves your problem? See here for details on the release candidate: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-3-RC1-td13860.html To test this, set the dependency for the flink-connector-kafka-09 to "1.1.3" and add the staging repository described in the above link to your pom.xml. Thanks, Stephan On Tue, Oct 4, 2016 at 5:51 AM, ankitcha wrote: > Hi Prabhu, cc Stephan, Robert, > > I was having similar issues where flink Kafka 09 consumer was not > committing > offsets to kafka. After digging into JobManager logs, I found that > checkpoints were getting expired before getting completed and hence > "checkpoint completed" message was being ignored. > > I increased the checkpoint interval from default 10 mins to 30 mins to > verify, and then checkpoints were getting finished way before timeout (~12 > mins), and then consumer was correctly updating offsets in kafka. > > This seems to be working for us at the moment, and also note this scenarios > normally happens at the start of the job and the consumer group already has > some decent lag. > > So, you might wanna try increasing checkpoint timeouts and check if that > solves the issue. You should look for following in the jobmanager logs > > [Checkpoint Timer] org.apache.flink.runtime.check > point.CheckpointCoordinator > - Checkpoint 37 expired before completing. > [Checkpoint Timer] org.apache.flink.runtime.check > point.CheckpointCoordinator > - Triggering checkpoint 38 @ 1474313373634 > [Checkpoint Timer] org.apache.flink.runtime.check > point.CheckpointCoordinator > - Checkpoint 38 expired before completing. > [Checkpoint Timer] org.apache.flink.runtime.check > point.CheckpointCoordinator > - Triggering checkpoint 39 @ 1474313973640 > > -- > Ankit > > > > -- > View this message in context: http://apache-flink-user-maili > ng-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consume > r-Behaviour-tp8257p9300.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > --001a113f6ade044786053e33f821 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi!

There was an issue in the Kafka 0= .9 consumer in Flink concerning checkpoints. It was relevant mostly for low= er-throughput topics / partitions.

It is fixed in the 1.1.3 release. Can you try = out the release candidate and see if that solves your problem?=C2=A0
<= div class=3D"gmail_extra">See here for details on the release candidate: http://apache-flink-mailing-= list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-3-RC1-td13= 860.html

To test this, set the dependency for the flink-connector-kafka-09 to= "1.1.3" and add the staging repository described in the above li= nk to your pom.xml.

Thanks,
Stephan


On Tue, Oct 4, 2016 at 5:51 AM, ankitcha <= ankitchaud= hary123@gmail.com> wrote:
Hi Prabhu, cc Stephan, Robert,

I was having similar issues where flink Kafka 09 consumer was not committin= g
offsets to kafka. After digging into JobManager logs, I found that
checkpoints were getting expired before getting completed and hence
"checkpoint completed" message was being ignored.

I increased the checkpoint interval from default 10 mins to 30 mins to
verify, and then checkpoints were getting finished way before timeout (~12<= br> mins), and then consumer was correctly updating offsets in kafka.

This seems to be working for us at the moment, and also note this scenarios=
normally happens at the start of the job and the consumer group already has=
some decent lag.

So, you might wanna try increasing checkpoint timeouts and check if that solves the issue. You should look for following in the jobmanager logs

[Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoord= inator
- Checkpoint 37 expired before completing.
[Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoord= inator
- Triggering checkpoint 38 @ 1474313373634
[Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoord= inator
- Checkpoint 38 expired before completing.
[Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoord= inator
- Triggering checkpoint 39 @ 1474313973640
<= br> --
Ankit



--
View this message in context: http://apache-flink-user-maili= ng-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consumer-B= ehaviour-tp8257p9300.html
Sent from the Apache Flink User Mailing = List archive. mailing list archive at Nabble.com.

--001a113f6ade044786053e33f821--