flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiaogang Shi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
Date Thu, 17 Nov 2016 09:38:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673253#comment-15673253

Xiaogang Shi commented on FLINK-5053:

[~srichter] Do you have a more detailed plan about incremental checkpoints? 

I think much more work is needed to make it. One big problem is the concurrent modification
made by TaskExecutors and JobMaster.

Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted
by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints.

One method is to synchronize the access of JobMasters and TaskExecutors. Another solution,
i think, is to let TaskExecutors delete these snapshot files.

Do you have any idea about this problem?

> Incremental / lightweight snapshots for checkpoints
> ---------------------------------------------------
>                 Key: FLINK-5053
>                 URL: https://issues.apache.org/jira/browse/FLINK-5053
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
> There is currently basically no difference between savepoints and checkpoints in Flink
and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which we should
take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a state from
which the application can be restarted, e.g. because Flink, some code, or the parallelism
needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for fast recovery
in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in that:
> - Savepoint should represent a state of the application, where characteristics of the
job (e.g. parallelism) can be adjusted for the next restart. One example for things that savepoints
need to be aware of are key-groups. Savepoints can potentially be a little more expensive
than checkpoints, because they are usually created a lot less frequently through the user.
> - Checkpoints are frequently triggered by the system to allow for fast failure recovery.
However, failure recovery leaves all characteristics of the job unchanged. This checkpoints
do not have to be aware of those, e.g. think again of key groups. Checkpoints should run faster
than creating savepoints, in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set of abilities,
e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect incremental checkpoints
and drop full key-group awareness, i.e. covering folders instead of files and not having keygroup_id
-> file/offset mapping, but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something similar to semi-async
snapshots, but using BackupableDBOptions::setShareTableFiles(true) and transferring only new
incremental outputs to HDFS. Notice that using RocksDB's internal backup mechanism is giving
up on the information about individual key-groups. But as explained above, this should be
totally acceptable for checkpoints, while savepoints should use the key-group-aware fully
async mode. Of course we also need to implement the ability to restore from both types of

This message was sent by Atlassian JIRA

View raw message