flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
Date Wed, 23 Nov 2016 10:10:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689638#comment-15689638
] 

ASF GitHub Bot commented on FLINK-4280:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89282863
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
---
    @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
     
     		PeriodicOffsetCommitter periodicCommitter = null;
     		try {
    -			// read offsets from ZooKeeper for partitions that did not restore offsets
    -			{
    -				List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    -				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())
{
    -					if (!partition.isOffsetDefined()) {
    -						partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    -					}
    +			List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
    +			for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())
{
    +				if (!partition.isOffsetDefined()) {
    +					partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
     				}
    +			}
    +
    +			if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
    +				// if all partitions have no initial offsets, that means we're starting fresh without
any restored state
    +				switch (startupMode) {
    +					case EARLIEST:
    +						LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())
{
    +							partition.setOffset(OffsetRequest.EarliestTime());
    +						}
    +						break;
    +					case LATEST:
    +						LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())
{
    +							partition.setOffset(OffsetRequest.LatestTime());
    +						}
    +						break;
    +					default:
    +					case GROUP_OFFSETS:
    +						LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for
partitions {}",
    +							kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
    +
    +						Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
    +						for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())
{
    +							Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
    +							if (offset != null) {
    +								// the committed offset in ZK represents the next record to process,
    +								// so we subtract it by 1 to correctly represent internal state
    +								partition.setOffset(offset - 1);
    +							} else {
    +								// if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
    +								// we default to "auto.offset.reset" like the Kafka high-level consumer
    +								LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
    +									" resetting starting offset to 'auto.offset.reset'", partition);
    +
    +								partition.setOffset(invalidOffsetBehavior);
    +							}
    +						}
    +				}
    +			} else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size()
< subscribedPartitions().length) {
    --- End diff --
    
    I think this case can currently never happen because on restore, we are only adding partitions
part from the restore.


> New Flink-specific option to set starting position of Kafka consumer without respecting
external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in topics for the
Flink Kafka consumer, users set the Kafka config {{auto.offset.reset}} in the provided properties
configuration.
> However, the way this config actually works might be a bit misleading if users were trying
to find a way to "read topics from a starting position". The way the {{auto.offset.reset}}
config works in the Flink Kafka consumer resembles Kafka's original intent for the setting:
first, existing external offsets committed to the ZK / brokers will be checked; if none exists,
then will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without taking
into account the external offsets. The original behaviour (reference external offsets first)
can be changed to be a user option, so that the behaviour can be retained for frequent Kafka
users that may need some collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, with a newly
introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a warning)
> props.setProperty("group.id", "...") // this won't have effect on the starting position
anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be latest
> props.setProperty("group.id", "..."); // will be used to lookup external offsets in ZK
/ broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting position".
As the Flink Kafka connector is somewhat essentially a "high-level" Kafka consumer for Flink
users, I think it is reasonable to add Flink-specific functionality that users will find useful,
although it wasn't supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is used only
to expose progress to the outside world, and not used to manipulate how Kafka topics are read
in Flink (unless users opt to do so)" is even more definite and solid. There was some discussion
in this PR (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I think
adding this "decouples" more Flink's internal offset checkpointing from the external Kafka's
offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message