flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Question about checkpointing with stateful operators and state recovery
Date Wed, 11 Oct 2017 15:10:42 GMT
Hi Frederico,

I'll try and give some answers:

1. Generally speaking, no. If you use keyed state, for example via RuntimeContext you don't
need to implement CheckpointedFunction.

2. You don't have to set setCommitOffsetsOnCheckpoints(true), this only affects how offsets
are committed to Kafka in case other systems want to check that offset. To get exactly once
semantics you have two general paths: 1) your sink is idempotent, meaning it doesn't matter
whether you write output multiple times 2) the sink has to be integrated with Flink checkpointing
and transactions. 2) was not easily possible for Kafka until Kafka 0.11 introduced transaction
support. Flink 1.4 will have a Kafka 0.11 producer that supports transactions so with that
you can have end-to-end exactly once.

3. The advantage of externalised checkpoints is that they don't get deleted when you cancel
a job. This is different from regular checkpoints, which get deleted when you manually cancel
a job. There are plans to make all checkpoints "externalised" in Flink 1.4.

4. Yes, you are correct. :-)


> On 28. Sep 2017, at 11:46, Federico D'Ambrosio <federico.dambrosio@smartlab.ws>
> Hi, I've got a couple of questions concerning the topics in the subject:
>     1. If an operator is getting applied on a keyed stream, do I still have to implement
the CheckpointedFunction trait and define the snapshotState and initializeState methods, in
order to successfully recover the state from a job failure?
>     2. While using a FlinkKafkaConsumer, enabling checkpointing allows exactly once semantics
end to end, provided that the sink is able to guarantee the same. Do I have to set
> setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly once semantics
in a sink?
>     3. What are the advantages of externalized checkpoints and which are the cases where
I would want to use them?
>     4. Let's suppose a scenario where: checkpointing is enabled every 10 seconds, I have
a kafka consumer which is set to start from the latest records, a sink providing at least
once semantics and a stateful keyed operator inbetween the consumer and the sink. Is it correct
that, in case of task failure, happens the following?
>         - the kafka consumer gets reverted to the latest offset (does it happen even
if I don't set setCommitOffsetsOnCheckpoints(true)?)
>         - the operator state gets reverted to the latest checkpoint
>         - the sink is stateless so it doesn't really care about what happened
>         - the stream restarts and probably some of the events coming to the sink have
already been     processed before
> Thank you for attention,
> Kind regards,
> Federico

View raw message