Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ABF4C200B64 for ; Tue, 2 Aug 2016 12:59:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AAB4D160A76; Tue, 2 Aug 2016 10:59:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0711160AA8 for ; Tue, 2 Aug 2016 12:59:21 +0200 (CEST) Received: (qmail 21031 invoked by uid 500); 2 Aug 2016 10:59:21 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 21008 invoked by uid 99); 2 Aug 2016 10:59:21 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 10:59:21 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id DCDD82C0D65 for ; Tue, 2 Aug 2016 10:59:20 +0000 (UTC) Date: Tue, 2 Aug 2016 10:59:20 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 02 Aug 2016 10:59:22 -0000 [ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403771#comment-15403771 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4280 at 8/2/16 10:59 AM: --------------------------------------------------------------------- [~StephanEwen] - *Can we keep the Kafka08 and Kafka09 configuration style similar?* The latest configuration suggestion was aiming to use the {{Properties}} only to configure the internal consumers (the properties are simply passed on), so any behaviour / configuration that the internal Kafka consumer doesn't recognise is to be supplied through the setter methods. So, since the internal {{SimpleConsumer}} in 0.8 and {{KafkaConsumer}} in 0.9 behaves differently to the given {{Properties}} (ex. {{SimpleConsumer}} doesn't recognise {{group.id}} while {{KafkaConsumer}} does), the setter methods also need to be designed a bit different to provide an overall same level of functionality. After some reconsideration, it probably isn't necessary to rework the configuration to this extent. {{FlinkKafkaConsumer08}} has already implemented behaviours for {{group.id}} and {{auto.commit.enable}} above the {{SimpleConsumer}} s, so in the current state of the Kafka connectors, {{FlinkKafkaConsumer08}} and {{FlinkKafkaConsumer09}} have equal functionality. By building on what we have already, it is certainly possible to keep the reworked configuration for the two versions similar. Also, this way we won't be breaking current behaviours. - *How about calling offsets in ZK/Kafka the "Group Offsets" ...* I agree, "Group Offsets" seems to be a better naming. - *Does it ever make sense to commit to a different group than to start from?* If we're keeping the {{group.id}} setting in the properties both in 0.8 and 0.9, then this won't be a case to consider. Taking into account the above and the comments so far, how about this for the reworked Kafka configuration: {code} Properties props = new Properties(); // for 0.8 consumer props.setProperty("group.id", "..."); props.setProperty("auto.commit.enable", "true/false"); // config for periodic committing in 0.8 props.setProperty("auto.commit.interval.ms", "...") ... // or for 0.9 consumer props.setProperty("group.id", "..."); props.setProperty("enable.auto.commit", "true/false"); // config for periodic committing in 0.9 props.setProperty("auto.commit.interval.ms", "...") ... FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props); // or FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromGroupOffsets(); // uses the "group.id" in props kafka.setCommitGroupOffsetsOnCheckpoint(boolean); // commits to the "group.id" (Flink checkpointing must be enabled to take effect). // Also, this overrides periodic checkpointing if it is enabled in the // props by "auto.commit.enable" / "enable.auto.commit". // if this is false, and periodic committing is also not set in props, no offset committing. kafka.setForwardMetrics(boolean); ... {code} To avoid breaking current behaviour, default starting position will be "from group offsets", and "commit on checkpoint" will be true. Let me know what you think! was (Author: tzulitai): [~StephanEwen] - *Can we keep the Kafka08 and Kafka09 configuration style similar?* The latest configuration suggestion was aiming to use the {{Properties}} only to configure the internal consumers (the properties are simply passed on), so any behaviour / configuration that the internal Kafka consumer doesn't recognise is to be supplied through the setter methods. So, since the internal {{SimpleConsumer}} in 0.8 and {{KafkaConsumer}} in 0.9 behaves differently to the given {{Properties}} (ex. {{SimpleConsumer}} doesn't recognise {{group.id}} while {{KafkaConsumer}} does), the setter methods also need to be designed a bit different to provide an overall same level of functionality. After a second thought, it probably isn't necessary to rework the configuration to this extent. {{FlinkKafkaConsumer08}} has already implemented behaviours for {{group.id}} and {{auto.commit.enable}} above the {{SimpleConsumer}} s, so in the current state of the Kafka connectors, {{FlinkKafkaConsumer08}} and {{FlinkKafkaConsumer09}} have equal functionality. By building on what we have already, it is certainly possible to keep the reworked configuration for the two versions similar. Also, this way we won't be breaking current behaviours. - *How about calling offsets in ZK/Kafka the "Group Offsets" ...* I agree, "Group Offsets" seems to be a better naming. - *Does it ever make sense to commit to a different group than to start from?* If we're keeping the {{group.id}} setting in the properties both in 0.8 and 0.9, then this won't be a case to consider. Taking into account the above and the comments so far, how about this for the reworked Kafka configuration: {code} Properties props = new Properties(); // for 0.8 consumer props.setProperty("group.id", "..."); props.setProperty("auto.commit.enable", "true/false"); // config for periodic committing in 0.8 props.setProperty("auto.commit.interval.ms", "...") ... // or for 0.9 consumer props.setProperty("group.id", "..."); props.setProperty("enable.auto.commit", "true/false"); // config for periodic committing in 0.9 props.setProperty("auto.commit.interval.ms", "...") ... FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props); // or FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromGroupOffsets(); // uses the "group.id" in props kafka.setCommitGroupOffsetsOnCheckpoint(boolean); // commits to the "group.id" (Flink checkpointing must be enabled to take effect). // Also, this overrides periodic checkpointing if it is enabled in the // props by "auto.commit.enable" / "enable.auto.commit". // if this is false, and periodic committing is also not set in props, no offset committing. kafka.setForwardMetrics(boolean); ... {code} To avoid breaking current behaviour, default starting position will be "from group offsets", and "commit on checkpoint" will be true. Let me know what you think! > New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker > ----------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in topics for the Flink Kafka consumer, users set the Kafka config {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if users were trying to find a way to "read topics from a starting position". The way the {{auto.offset.reset}} config works in the Flink Kafka consumer resembles Kafka's original intent for the setting: first, existing external offsets committed to the ZK / brokers will be checked; if none exists, then will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without taking into account the external offsets. The original behaviour (reference external offsets first) can be changed to be a user option, so that the behaviour can be retained for frequent Kafka users that may need some collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a warning) > props.setProperty("group.id", "...") // this won't have effect on the starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be latest > props.setProperty("group.id", "..."); // will be used to lookup external offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting position". As the Flink Kafka connector is somewhat essentially a "high-level" Kafka consumer for Flink users, I think it is reasonable to add Flink-specific functionality that users will find useful, although it wasn't supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is used only to expose progress to the outside world, and not used to manipulate how Kafka topics are read in Flink (unless users opt to do so)" is even more definite and solid. There was some discussion in this PR (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I think adding this "decouples" more Flink's internal offset checkpointing from the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)