flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Smith <java.dev....@gmail.com>
Subject Re: How are kafka consumer offsets handled if sink fails?
Date Tue, 09 Jul 2019 17:30:06 GMT
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it
start at the 4th record and continue or where ever the checkpoint happend.
Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed
what to do.
Just want to try to understand when and where the checkpointing will
happen.

On Mon, 8 Jul 2019 at 22:23, Rong Rong <walterddr@gmail.com> wrote:

> Hi John,
>
> I think what Konstantin is trying to say is: Flink's Kafka consumer does
> not start consuming from the Kafka commit offset when starting the
> consumer, it would actually start with the offset that's last checkpointed
> to external DFS. (e.g. the starting point of the consumer has no relevance
> with Kafka committed offset whatsoever - if checkpoint is enabled.)
>
> This is to quote:
> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a
> best-effort basis after every checkpoint. Internally Flink "commits" the
> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*
> "
>
> However if you do not enable checkpointing, I think your consumer will
> by-default restart from the default kafka offset (which I think is your
> committed group offset).
>
> --
> Rong
>
>
> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev.mtl@gmail.com> wrote:
>
>> So when we say a sink is at least once. It's because internally it's not
>> checking any kind of state and it sends what it has regardless, correct?
>> Cause I willl build a sink that calls stored procedures.
>>
>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
>> konstantin@ververica.com> wrote:
>>
>>> Hi John,
>>>
>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>>> restarted from the last checkpoint. This means the offset of all Kafka
>>> partitions will be reset to that point in the stream along with state of
>>> all operators. To enable checkpointing you need to call
>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>>> duplicated in the case of failures.
>>>
>>> To answer your questions:
>>>
>>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>>> auto-commit).
>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on
>>> a best-effort basis after every checkpoint. Internally Flink "commits" the
>>> checkpoints as part of its periodic checkpoints.
>>> * Yes, along with all other events between the last checkpoint and the
>>> failure.
>>> * It will continue from the last checkpoint.
>>>
>>> Hope this helps.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev.mtl@gmail.com>
>>> wrote:
>>>
>>>> Hi using Apache Flink 1.8.0
>>>>
>>>> I'm consuming events from Kafka using nothing fancy...
>>>>
>>>> Properties props = new Properties();
>>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>>> props.setProperty("group.id",kafkaGroup);
>>>>
>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(),props);
>>>>
>>>>
>>>> Do some JSON transforms and then push to my SQL database using JDBC and
>>>> stored procedure. Let's assume the SQL sink fails.
>>>>
>>>> We know that Kafka can either periodically commit offsets or it can be
>>>> done manually based on consumers logic.
>>>>
>>>> - How is the source Kafka consumer offsets handled?
>>>> - Does the Flink Kafka consumer commit the offset to per event/record?
>>>> - Will that single event that failed be retried?
>>>> - So if we had 5 incoming events and say on the 3rd one it failed, will
>>>> it continue on the 3rd or will the job restart and try those 5 events.
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>>
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>>
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>

Mime
View raw message