flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8360) Implement task-local state recovery
Date Sat, 10 Feb 2018 05:06:05 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359253#comment-16359253

ASF GitHub Bot commented on FLINK-8360:

Github user bowenli86 commented on a diff in the pull request:

    --- Diff: docs/ops/state/large_state_tuning.md ---
    @@ -234,4 +234,97 @@ Compression can be activated through the `ExecutionConfig`:
     **Notice:** The compression option has no impact on incremental snapshots, because they
are using RocksDB's internal
     format which is always using snappy compression out of the box.
    +## Task-Local Recovery
    +### Motivation
    +In Flink's checkpointing, each task produces a snapshot of its state that is then written
to a distributed store. Each task acknowledges
    +a successful write of the state to the job manager by sending a handle that describes
the location of the state in the distributed store.
    +The job manager, in turn, collects the handles from all tasks and bundles them into a
checkpoint object.
    +In case of recovery, the job manager opens the latest checkpoint object and sends the
handles back to the corresponding tasks, which can
    +then restore their state from the distributed storage. Using a distributed storage to
store state has two important advantages. First, the storage
    +is fault tolerant and second, all state in the distributed store is accessible to all
nodes and can be easily redistributed (e.g. for rescaling).
    +However, using a remote distributed store has also one big disadvantage: all tasks must
read their state from a remote location, over the network.
    +In many scenarios, recovery could reschedule failed tasks to the same task manager as
in the previous run (of course there are exceptions like machine
    +failures), but we still have to read remote state. This can result in *long recovery
times for large states*, even if there was only a small failure on
    +a single machine.
    +### Approach
    +Task-local state recovery targets exactly this problem of long recovery times and the
main idea is the following: for every checkpoint, we do not
    +only write task states to the distributed storage, but also keep *a secondary copy of
the state snapshot in a storage that is local to the task*
    +(e.g. on local disk or in memory). Notice that the primary store for snapshots must still
be the distributed store, because local storage does not
    +ensure durability under node failures abd also does not provide access for other nodes
to redistribute state, this functionality still requires the
    +primary copy.
    +However, for each task that can be rescheduled to the previous location for recovery,
we can restore state from the secondary, local
    +copy and avoid the costs of reading the state remotely. Given that *many failures are
not node failures and node failures typically only affect one
    +or very few nodes at a time*, it is very likely that in a recovery most tasks can return
to their previous location and find their local state intact.
    +This is what makes local recovery effective in reducing recovery time.
    +Please note that this can come at some additional costs per checkpoint for creating and
storing the secondary local state copy, depending on the
    +chosen state backend and checkpointing strategy. For example, in most cases the implementation
will simply duplicate the writes to the distributed
    +store to a local file.
    +<img src="../../fig/local_recovery.png" class="center" width="80%" alt="Illustration
of checkpointing with task-local recovery."/>
    +### Relationship of primary (distributed store) and secondary (task-local) state snapshots
    +Task-local state is always considered a secondary copy, the ground truth of the checkpoint
state is the primary copy in the distributed store. This
    +has implications for problems with local state during checkpointing and recovery:
    +- For checkpointing, the *primary copy must be successful* and a failure to produce the
*secondary, local copy will not fail* the checkpoint. A checkpoint
    +will fail if the primary copy could not be created, even if the secondary copy was successfully
    +- Only the primary copy is acknowledged and managed by the job manager, secondary copies
are owned by task managers and their life cycle can be
    +independent from their primary copy. For example, it is possible to retain a history
of the 3 latest checkpoints as primary copies and only keep
    +the task-local state of the latest checkpoint.
    +- For recovery, Flink will always *attempt to restore from task-local state first*, if
a matching secondary copy is available. If any problem occurs during
    +the recovery from the secondary copy, Flink will *transparently retry to recovery the
task from the primary copy*. Recovery only fails, if primary
    +and the (optional) secondary copy failed. In this case, depending on the configuration
Flink could still fall back to an older checkpoint.
    --- End diff --
    secondary cop**ies**

> Implement task-local state recovery
> -----------------------------------
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
> This issue tracks the development of recovery from task-local state. The main idea is
to have a secondary, local copy of the checkpointed state, while there is still a primary
copy in DFS that we report to the checkpoint coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, to save
network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and can easily
enhance it to all other state types (e.g. operator state) later.

This message was sent by Atlassian JIRA

View raw message