flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: FlinkKafkaConsumer09
Date Fri, 29 Jul 2016 08:03:43 GMT
Hi Tai,

Should definitely be possible. Would you mind opening a JIRA issue
with the description you posted?

Thanks,
Max

On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzulitai@gmail.com> wrote:
> Hi Kevin,
>
> Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest”
> for the older Kafka 0.8.
>
> I’m wondering whether or not it is reasonable to add a Flink-specific way
> to set the consumer’s starting position to “earliest” and “latest”, without
> respecting the external Kafka offset store. Perhaps we can change the
> current behaviour (checking committed offsets in Kafka as starting point)
> as a user option, and add new options to read from “earliest” and “latest”
> regardless of the groupId and externally committed offsets. I think this
> better matches how users usually interpret the functionality of setting
> starting positions, while also keeping the “auto.offset.reset” behaviour
> that frequent Kafka users are used to. Also, this would also more clearly
> define that under the context of Flink, the external Kafka offset store is
> used only to expose the consumers progress to the outside world, and not
> used to manipulate how topics are read.
>
> Just an idea I have in mind, not sure if it would be a reasonable add. It’d
> be great to hear what other think of this.
>
> Regards,
> Gordon
>
>
> On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jacobs@cern.ch) wrote:
>
> Thank you Gordon and Max,
>
> Thank you Gordon, that explains the behaviour a bit better to me. I am
> now adding the timestamp to the group ID and that is a good workaround
> for now. The "smallest" option is unfortunately not available in this
> version of the FlinkKafkaConsumer class.
>
> Cheers,
> Kevin
>
>
> On 28.07.2016 10:39, Maximilian Michels wrote:
>> Hi Kevin,
>>
>> You need to use properties.setProperty("auto.offset.reset",
>> "smallest") for Kafka 9 to start from the smallest offset. Note, that
>> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
>> "earliest") to achieve the same behavior.
>>
>> Kafka keeps track of the offsets per group id. If you have already
>> read from a topic with a certain group id and want to restart from the
>> smallest offset available, you need to generate a unique group id.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jacobs@cern.ch>
> wrote:
>>> Hi,
>>>
>>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> class. I
>>> am using Flink 1.0.3.
>>>
>>> These are my properties:
>>>
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", config.urlKafka)
>>> properties.setProperty("group.id", COLLECTOR_NAME)
>>> properties.setProperty("auto.offset.reset", *"earliest"*)
>>>
>>> According to the new consumer API of Kafka, this should result in the
>>> following:
>>>
>>> /auto.offset.reset: * smallest : automatically reset the offset to the
>>> smallest offset/ (source:
>>> https://kafka.apache.org/documentation.html#newconsumerapi)
>>>
>>> However, it starts from the latest item in my topic. Is this a bug or am
> I
>>> doing something wrong?
>>>
>>> Regards,
>>> Kevin
>>>

Mime
View raw message