flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2976) Save and load checkpoints manually
Date Thu, 05 Nov 2015 12:47:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991613#comment-14991613
] 

Aljoscha Krettek commented on FLINK-2976:
-----------------------------------------

Yes, it is definitely required. Although, we have to think about how this will work in the
future with changed topologies. (For example, adding a simple stateless mapper to to a transformation
would change the topology).

I think we need a way for users to specify a unique ID of stateful operators so that state
can be mapped to an operator. Then, if the user extends the topology and restarts from a previous
state the existing state could be mapped to the correct operators.

By the way, this also becomes very tricky once you start thinking about scale-in/scale-out.
 

> Save and load checkpoints manually
> ----------------------------------
>
>                 Key: FLINK-2976
>                 URL: https://issues.apache.org/jira/browse/FLINK-2976
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>    Affects Versions: 0.10
>            Reporter: Ufuk Celebi
>             Fix For: 1.0
>
>
> Currently, all checkpointed state is bound to a job. After the job finishes all state
is lost. In case of an HA cluster, jobs can live longer than the cluster, but they still suffer
from the same issue when they finish.
> Multiple users have requested the feature to manually save a checkpoint in order to resume
from it at a later point. This is especially important for production environments. As an
example, consider upgrading your existing production Flink program. Currently, you loose all
the state of your program. With the proposed mechanism, it will be possible to save a checkpoint,
stop and update your program, and then continue your program with the  checkpoint.
> The required operations can be simple:
> saveCheckpoint(JobID) => checkpointID: long
> loadCheckpoint(JobID, long) => void
> For the initial version, I would apply the following restriction:
> - The topology needs to stay the same (JobGraph parallelism, etc.)
> A user can configure this behaviour via the environment like the checkpointing interval.
Furthermore, the user can trigger the save operation via the command line at arbitrary times
and load a checkpoint when submitting a job, e.g.
> bin/flink checkpoint <JobID> => checkpointID: long 
> and
> bin/flink run --loadCheckpoint JobID [latest saved checkpoint]
> bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint]
> As far as I can tell, the required mechanisms are similar to the ones implemented for
JobManager high availability. We need to make sure to persist the CompletedCheckpoint instances
as a pointer to the checkpoint state and to *not* remove saved checkpoint state.
> On the client side, we need to give the job and its vertices the same IDs to allow mapping
the checkpoint state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message