Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9849A200BAE for ; Mon, 19 Sep 2016 17:55:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 96FEA160ACC; Mon, 19 Sep 2016 15:55:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C934C160AE0 for ; Mon, 19 Sep 2016 17:55:22 +0200 (CEST) Received: (qmail 81820 invoked by uid 500); 19 Sep 2016 15:55:22 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 81645 invoked by uid 99); 19 Sep 2016 15:55:21 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 15:55:21 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id CDEE92C0D5A for ; Mon, 19 Sep 2016 15:55:21 +0000 (UTC) Date: Mon, 19 Sep 2016 15:55:21 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 19 Sep 2016 15:55:23 -0000 [ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503837#comment-15503837 ] Tzu-Li (Gordon) Tai commented on FLINK-4618: -------------------------------------------- I'm not entirely sure of whether the KafkaConsumer starts "at" or "after" the found offsets in ZK though. Correctly, it should be "after". Perhaps something is wrong here, and we need to workaround the behaviour? > FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka > --------------------------------------------------------------------------------------- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 > Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when restarting job** > There seem to be an issue with the offset management in Flink. When a job is stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset 4848911, which is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 > 10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. > 10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] > 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 > // See below! Shouldn't the offset be 4848911? > 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to the committed offset 4848910 > 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1001 at hdp1:6667. > 10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 1001 > // Here record 4848910 gets consumed again! > 10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list > 10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911 > 10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache > 10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:3079 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)