Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C65D200B85 for ; Thu, 15 Sep 2016 18:52:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7AECC160AC6; Thu, 15 Sep 2016 16:52:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 98F94160ABA for ; Thu, 15 Sep 2016 18:52:26 +0200 (CEST) Received: (qmail 31890 invoked by uid 500); 15 Sep 2016 16:52:20 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 31617 invoked by uid 99); 15 Sep 2016 16:52:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Sep 2016 16:52:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 91B982C1B80 for ; Thu, 15 Sep 2016 16:52:20 +0000 (UTC) Date: Thu, 15 Sep 2016 16:52:20 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 15 Sep 2016 16:52:27 -0000 [ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --------------------------------------- Summary: Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka (was: Kafka Consumer 0.9 should start from the next record on startup from Kafka offsets) > Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka > ------------------------------------------------------------------------------------- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 > Reporter: Melmoth > > **Original reported ticket title: Last kafka message gets consumed twice when restarting job** > There seem to be an issue with the offset management in Flink. When a job is stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset 4848911, which is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 > 10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. > 10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] > 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 > // See below! Shouldn't the offset be 4848911? > 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to the committed offset 4848910 > 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 > 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1001 at hdp1:6667. > 10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 1001 > // Here record 4848910 gets consumed again! > 10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list > 10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911 > 10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache > 10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:3079 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)