flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
Date Sat, 30 Jul 2016 05:37:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400472#comment-15400472

Tzu-Li (Gordon) Tai commented on FLINK-4280:

[~StephanEwen] On second thought, I think I misunderstood what you meant in the first place.

What you're proposing is this (I think this is a clearer design than what I mentioned above):

Properties props = new Properties();

FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing
is enabled, otherwise, periodically.

env.addSource(kafka) ...

Echoing your statement if we are going with this approach:

now that we are trying to separate Flink-specific configs with Kafka configs, I think we should
clearly state (and change implementation) that the {{Properties}} provided in the constructor
will only be used to configure the internal Kafka consumers the connector is using by simply
passing the {{Properties}}. So, the only valid configs given in the {{Properties}} that will
take effect are the ones that the Kafka API supports, i.e. in {{FlinkKafkaConsumer08}}, only
the configs that the Kafka {{SimpleConsumer}} API support take effect; in {{FlinkKafkaConsumer09}},
only the configs that the new consumer API {{KafkaConsumer}} support will take effect. Any
additional function or Flink-specific behaviour on top of the internal Kafka consumers should
go through setter methods.

The problem to solve, in general, with the current configuration is that we are trying to
"mimic" high-level consumer functions with original config keys. Take {{FlinkKafkaConsumer08}}
for example: the {{SimpleConsumer}} API actually doesn't use the {{group.id}} or {{auto.offset.reset}}
configs. We're re-implementing the behavior of these configs ourselves, and providing them
through the original config keys in the {{Properties}}. When it comes to adding functionality
on top of the internally used {{SimpleConsumer}}, we tend to stretch the original definition
of these keys and try to have them work with our re-implementations of configs such as {{group.id}}
and {{auto.offset.reset}}. An example of confusions that users might also get when we're re-implementing
configs when the internal API doesn't actually use them is present in this user ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html.

This also reasons the idea you mentioned in FLINK-3398 that we should drop Kafka's {{group.id}}
and perhaps have Flink's own groupId. Since Kafka's {{group.id}} was never actually used by
the internal {{SimpleConsumer}} of {{FlinkKafkaConsumer08}} in the first place, we should
have setter methods for functions like "start with offset" or "offset committing", which the
user should supply with a groupId. For {{FlinkKafkaConsumer09}}, we won't need a setter method
for "periodic offset committing" because the internal {{KafkaConsumer}} supports the function
through {{group.id}} and {{enable.auto.commit}}; instead, we have a setter method to opt to
switch to "commit offsets on checkpoint".

Summarize in code:

// for FlinkKafkaConsumer08
Properties props = new Properties();
FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props);
kafka.setEnableCommitOffsets("groupId"); // periodic if checkpointing is not enabled, otherwise
on notifyCheckpointComplete()

// for FlinkKafkaConsumer09
Properties props = new Properties();
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props);
kafka.setStartFromExternalOffsets(); // doesn't take a "group.id", because in FlinkKafkaConsumer09,
"group.id" is a reckognized config by the new KafkaConsumer API
kafka.setCommitOffsetsOnCheckpoint(boolean); // if true (checkpointing should be enabled),
overrides periodic checkpointing if "enable.auto.commit" is set in props

So, the general rule is:

- Supplied configuration is used only to configure the internally used client APIs of the
external system.
- All Flink-specific configuration, or functions that the internal API do not support, go
through connector-specific setter methods.

This might be a general rule we would like all Flink supported connectors to follow, in the
long run? Users will have clear understanding and full control of the behaviours of the internal
API that the connectors are using, and we'd also have a clear line on how new functionality
should be added upon them.

> New Flink-specific option to set starting position of Kafka consumer without respecting
external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
> Currently, to start reading from the "earliest" and "latest" position in topics for the
Flink Kafka consumer, users set the Kafka config {{auto.offset.reset}} in the provided properties
> However, the way this config actually works might be a bit misleading if users were trying
to find a way to "read topics from a starting position". The way the {{auto.offset.reset}}
config works in the Flink Kafka consumer resembles Kafka's original intent for the setting:
first, existing external offsets committed to the ZK / brokers will be checked; if none exists,
then will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without taking
into account the external offsets. The original behaviour (reference external offsets first)
can be changed to be a user option, so that the behaviour can be retained for frequent Kafka
users that may need some collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, with a newly
introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a warning)
> props.setProperty("group.id", "...") // this won't have effect on the starting position
anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be latest
> props.setProperty("group.id", "..."); // will be used to lookup external offsets in ZK
/ broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting position".
As the Flink Kafka connector is somewhat essentially a "high-level" Kafka consumer for Flink
users, I think it is reasonable to add Flink-specific functionality that users will find useful,
although it wasn't supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is used only
to expose progress to the outside world, and not used to manipulate how Kafka topics are read
in Flink (unless users opt to do so)" is even more definite and solid. There was some discussion
in this PR (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I think
adding this "decouples" more Flink's internal offset checkpointing from the external Kafka's
offset store.

This message was sent by Atlassian JIRA

View raw message