flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstan...@ververica.com>
Subject Re: How are kafka consumer offsets handled if sink fails?
Date Tue, 09 Jul 2019 17:51:05 GMT
Hi John,

this depends on your checkpoint interval. When enabled checkpoints are
triggered periodically [1].

Cheers,

Konstantin

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html



On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev.mtl@gmail.com> wrote:

> Ok so just to be clear. Let's say we started at day 0...
>
> 1- Producer inserted 10 records into Kafka.
> 2- Kafka Flink Consumer consumed 5 records.
> 3- Some transformations applied to those records.
> 4- 4 records sinked, 1 failed.
> 5- Flink Job restarts because of above failure.
>
> When does the checkpoint happen above?
> And does it mean in the above case that it will start back at 0 or will it
> start at the 4th record and continue or where ever the checkpoint happend.
> Example 3rd record?
> My stored proc will be idempotent and I understand if messages get
> replayed what to do.
> Just want to try to understand when and where the checkpointing will
> happen.
>
> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walterddr@gmail.com> wrote:
>
>> Hi John,
>>
>> I think what Konstantin is trying to say is: Flink's Kafka consumer does
>> not start consuming from the Kafka commit offset when starting the
>> consumer, it would actually start with the offset that's last checkpointed
>> to external DFS. (e.g. the starting point of the consumer has no relevance
>> with Kafka committed offset whatsoever - if checkpoint is enabled.)
>>
>> This is to quote:
>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a
>> best-effort basis after every checkpoint. Internally Flink "commits" the
>> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*
>> "
>>
>> However if you do not enable checkpointing, I think your consumer will
>> by-default restart from the default kafka offset (which I think is your
>> committed group offset).
>>
>> --
>> Rong
>>
>>
>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev.mtl@gmail.com> wrote:
>>
>>> So when we say a sink is at least once. It's because internally it's not
>>> checking any kind of state and it sends what it has regardless, correct?
>>> Cause I willl build a sink that calls stored procedures.
>>>
>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
>>> konstantin@ververica.com> wrote:
>>>
>>>> Hi John,
>>>>
>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>>>> restarted from the last checkpoint. This means the offset of all Kafka
>>>> partitions will be reset to that point in the stream along with state of
>>>> all operators. To enable checkpointing you need to call
>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>>>> duplicated in the case of failures.
>>>>
>>>> To answer your questions:
>>>>
>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>>>> auto-commit).
>>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka
>>>> on a best-effort basis after every checkpoint. Internally Flink "commits"
>>>> the checkpoints as part of its periodic checkpoints.
>>>> * Yes, along with all other events between the last checkpoint and the
>>>> failure.
>>>> * It will continue from the last checkpoint.
>>>>
>>>> Hope this helps.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev.mtl@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi using Apache Flink 1.8.0
>>>>>
>>>>> I'm consuming events from Kafka using nothing fancy...
>>>>>
>>>>> Properties props = new Properties();
>>>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>>>> props.setProperty("group.id",kafkaGroup);
>>>>>
>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
new SimpleStringSchema(),props);
>>>>>
>>>>>
>>>>> Do some JSON transforms and then push to my SQL database using JDBC
>>>>> and stored procedure. Let's assume the SQL sink fails.
>>>>>
>>>>> We know that Kafka can either periodically commit offsets or it can be
>>>>> done manually based on consumers logic.
>>>>>
>>>>> - How is the source Kafka consumer offsets handled?
>>>>> - Does the Flink Kafka consumer commit the offset to per event/record?
>>>>> - Will that single event that failed be retried?
>>>>> - So if we had 5 incoming events and say on the 3rd one it failed,
>>>>> will it continue on the 3rd or will the job restart and try those 5 events.
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf | Solutions Architect
>>>>
>>>> +49 160 91394525
>>>>
>>>>
>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>>>
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>>
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>
>>>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Mime
View raw message