flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paris Carbone <par...@kth.se>
Subject Re: Incremental checkpoints for Flink
Date Mon, 21 Dec 2015 12:04:58 GMT
Hi Marius,

This is a pretty good and quite challenging topic for a thesis! Your thoughts are pretty much
aligned with strategies that we have discussed in the past.
There are some initial steps in the makings towards that direction. For example, Aljoscha
proposed specialized operator state (KVState) types earlier this month (e.g. Maps, Lists)
that can potentially support incremental snapshotting.  I think that the incremental snapshotting
strategy fits pretty well with mutable backend storages (e.g. sql databases, kv stores etc).
From a quick look, option I is close to what most of us have in mind I guess. The second option
is quite tricky since it is not always possible to define what a “diff” is.

If you want to get a more strict overview of how the snapshotting mechanism works you can
take a look at the arXiv paper [1] we submitted earlier this year, but also blogs, docs [2,3]
and technical presentations [4] on that subject.

In case you decide to go for this topic it will be good to check the implementation of the
current state coordination and sync with us on what is the best way to proceed. The CheckpointCoordinator
is a good starting point [5]. I would also be happy to co-supervise you or direct you if you


[1] http://arxiv.org/abs/1506.08603
[2] https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
[3] http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
[4] http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
[5] https://github.com/apache/flink/blob/55fd5f32d7ef0292a01192ab08456fae49b91791/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

On 19 Dec 2015, at 17:04, Marius Melzer <marius@rasumi.net<mailto:marius@rasumi.net>>

Hello everyone,

in order to look for a subject for my diploma thesis, I was at the Flink
Forward conference in October. I talked to Aljoscha and some others
there and after the Fault Tolerance talk on day 2 I arrived at the idea
that an incremental checkpointing of the state of a process when a
barrier arrives would probably be a nice feature for Flink and a good
topic for my thesis. This would espescially be interesting for very
large, e.g. key-value based, states that are necessary for scenarios
like decentralised material views ([1], [2]). Independently, Asterios
from the TU Berlin suggested to me the same topic when I met him. This
is nothing new, e.g. Apache Samza does incremental backup of internal
state as well by writing every change to a special Kafka topic from
which it can be restored when something fails. The approach for Flink
would rather be an adaption of the current checkpointing mechanism.

So my main questions are:
* would incremental checkpoints be an appreciated change and do you
think it would fit a diploma thesis by the effort that's necessary?
* is there already someone working in this area?

I already put some initial thoughts into how it might be possible to
achieve the goal:

How to checkpoint:
(I) Memorize which changes have been made after last checkpoint
 - Pro: Lightweight solution, since only the things that changed need
to be compressed and transfered
 - Contra: You would want to support this not only for each "state
variable" but also inside them, e.g. for lists, key-value structures,
everything. Unfortunately there doesn't seem to be the possibility to
observe changes made on plain java collections or objects in general (or
is there?). So you would need to use a different collection library or a
wrapper around the existing java standard ones.
 - I could imagine the checkpointing somehow like this:
   (1) The programmer of the transformation (with state) uses for the
OperatorState a (wrapped) collection/other type that implements a
certain interface (e.g. "IncrementallyCheckpointed") that demands
something like a changesSinceLastCheckpoint() function
   (2) The flink runtime would check if the state is implementing
IncrementallyCheckpointed and if yes, calls the
changesSinceLastCheckpoint() function.
   (3) There would be the need to differentiate between "regular/full"
checkpoints of a state and "incremental" ones when transferring the
checkpoint to the backup/checkpoint server.

(II) Keep last state and make a diff (preferably with the already
serialised checkpoint):
 - Pro: Much easier solution, doesn't need wrapping or adapting of
collections or other types, very general approach, the transferred data
shouldn't be more than in case (I) - maybe in some cases even less
 - Contra: Would usually almost double the memory needs of the
transformation, for large collections this would also mean quite some
processing effort for computing the diff

(III?) Is there another kind of approach you could imagine?

Which parts need change:
 - The checkpointing of the transformation state (but not the restoring
of the state, this stays the same)
 - The protocol of how to transfer the checkpoints needs at least meta
data (full/normal checkpoint vs. incremental)
 - The checkpoint server needs to be able to update its current state
from the diffs/changes it receives

I would really appreciate help and assessment of these ideas and the
general subject. Also, if someone could give me a quick overview over
the details of the current checkpointing (and which parts of the code
are worth exploring), I'd be happy about that too!

Thanks in advance,

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message