flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mao, Wei" <wei....@intel.com>
Subject RE: How to get latest offsets with FlinkKafkaConsumer
Date Fri, 05 Aug 2016 13:24:31 GMT
Thank you Stefan and Gordon, It’s really helpful.

I will try the “auto.offset.reset” property. And instead of use new consumer group every
time, I would like to clean the offset under current consumer group before restarting Flink
application, in order to avoid redundant records in ZK.

Regards,
William

From: Tzu-Li (Gordon) Tai [mailto:tzulitai@gmail.com]
Sent: Friday, August 5, 2016 8:57 PM
To: user@flink.apache.org
Subject: Re: How to get latest offsets with FlinkKafkaConsumer

Hi,

Please also note that the “auto.offset.reset” property is only respected when there is
no offsets under the same consumer group in ZK. So, currently, in order to make sure you read
from the latest / earliest offsets every time you restart your Flink application, you’d
have to use an unique groupId on each restart.

We’re currently working on new configuration for the Kafka consumer to explicitly configure
the starting offset / position without respecting existing offsets in ZK. You can follow the
corresponding JIRA here: https://issues.apache.org/jira/browse/FLINK-4280.

Regards,
Gordon


On August 5, 2016 at 8:47:32 PM, Stefan Richter (s.richter@data-artisans.com<mailto:s.richter@data-artisans.com>)
wrote:
Sorry, I think you are actually asking for the largest offset in the Kafka source, which makes
it setProperty("auto.offset.reset", "largest").

Am 05.08.2016 um 14:44 schrieb Stefan Richter <s.richter@data-artisans.com<mailto:s.richter@data-artisans.com>>:

Hi,

I think passing properties with setProperty("auto.offset.reset", "smallest“) to the Kafka
consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei <wei.mao@intel.com<mailto:wei.mao@intel.com>>:

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And I noticed that
when I restarted my Flink application, it reads records starting from the latest offset that
I consumed last time, but not the latest offsets of that topic in Kafka.

So Is there any way to make it read from last offsets of broker/MyTopic instead of consumer/MyTopic
in Flink?

Thanks,
William


Mime
View raw message