flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
Date Tue, 19 Sep 2017 13:34:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171689#comment-16171689
] 

ASF GitHub Bot commented on FLINK-7449:
---------------------------------------

Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4543#discussion_r139687055
  
    --- Diff: docs/ops/state/checkpoints.md ---
    @@ -99,3 +99,296 @@ above).
     ```sh
     $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
     ```
    +
    +## Incremental Checkpoints
    +
    +### Synopsis
    +
    +Incremental checkpoints can significantly reduce checkpointing time in comparison to
full checkpoints, at the cost of a
    +(potentially) longer recovery time. The core idea is that incremental checkpoints only
record changes in state since the
    +previously-completed checkpoint instead of producing a full, self-contained backup of
the state backend. In this way,
    +incremental checkpoints can build upon previous checkpoints.
    +
    +RocksDBStateBackend is currently the only backend that supports incremental checkpoints.
    +
    +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating
over time. As a result, the
    +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints
are eventually subsumed and
    +pruned automatically.
    +
    +``While we strongly encourage the use of incremental checkpoints for Flink jobs with
large state, please note that this is
    +a new feature and currently not enabled by default``.
    +
    +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding
boolean flag in the
    +constructor set to `true`, e.g.:
    +
    +```java
    +   RocksDBStateBackend backend =
    +       new RocksDBStateBackend(filebackend, true);
    +```
    +
    +### Use-case for Incremental Checkpoints
    +
    +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint
represents a consistent
    +snapshot of the distributed state of a Flink job from which the system can recover in
case of a software or machine
    +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). 
    +
    +Flink creates checkpoints periodically to track the progress of a job so that, in case
of failure, only those
    +(hopefully few) *events that have been processed after the last completed checkpoint*
must be reprocessed from the data
    +source. The number of events that must be reprocessed has implications for recovery time,
and so for fastest recovery,
    +we want to *take checkpoints as often as possible*.
    +
    +However, checkpoints are not without performance cost and can introduce *considerable
overhead* to the system. This
    +overhead can lead to lower throughput and higher latency during the time that checkpoints
are created. One reason is
    +that, traditionally, each checkpoint in Flink always represented the *complete state*
of the job at the time of the
    +checkpoint, and all of the state had to be written to stable storage (typically some
distributed file system) for every
    +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint
can obviously create
    +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s
data processing work.
    +
    +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery
time and checkpointing
    +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this
is exactly the problem that
    +incremental checkpoints solve.
    +
    +
    +### Basics of Incremental Checkpoints
    +
    +In this section, for the sake of getting the concept across, we will briefly discuss
the idea behind incremental
    +checkpoints in a simplified manner.
    +
    +Our motivation for incremental checkpointing stemmed from the observation that it is
often wasteful to write the full
    +state of a job for every single checkpoint. In most cases, the state between two checkpoints
is not drastically
    +different, and only a fraction of the state data is modified and some new data added.
Instead of writing the full state
    +into each checkpoint again and again, we could record only changes in state since the
previous checkpoint. As long as we
    +have the previous checkpoint and the state changes for the current checkpoint, we can
restore the full, current state
    +for the job. This is the basic principle of incremental checkpoints, that each checkpoint
can build upon a history of
    +previous checkpoints to avoid writing redundant information.
    +
    +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full
checkpointing.
    +
    +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full
checkpoint is simply a copy of the whole
    +state.
    +
    +<p class="text-center">
    +   <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints" width="80%" src="{{
site.baseurl }}/fig/incremental_cp_basic.svg"/>
    +</p>
    +
    +With incremental checkpointing, each checkpoint contains only the state change since
the previous checkpoint.
    +
    +* For the first checkpoint ``CP 1``, there is no difference between a full checkpoint
and the complete state at the time the
    +checkpoint is written.
    +
    +* For ``CP 2``, incremental checkpointing will write only the changes since ``CP 1``:
the value for ``K1`` has changed and a mapping
    +for ``K3`` was added.
    +
    +* For checkpoint ``CP 3``, incremental checkpointing only records the changes since ``CP
2``.
    +
    +Notice that, unlike in full checkpoints, we also must record changes that delete state
in an incremental checkpoint, as
    +in the case of ``K0``. In this simple example, we can see how incremental checkpointing
can reduce the amount of data that
    +is written for each checkpoint.
    +
    +The next interesting question: how does restoring from incremental checkpoints compare
to restoring from full
    +checkpoints? Restoring a full checkpoint is as simple as loading all the data from the
checkpoint back into the job
    +state because full checkpoints are self-contained. In contrast, to restore an incremental
checkpoint, we need to replay
    +the history of all incremental checkpoints that are in the reference chain of the checkpoint
we are trying to restore.
    +In our example from Figure 1, those connections are represented by the orange arrows.
If we want to restore ``CP 3``, as a
    +first step, we need to apply all the changes of ``CP 1`` to the empty initial job state.
On top of that, we apply the
    +changes from ``CP 2``, and then the changes from ``CP 3``.
    +
    +A different way to think about basic incremental checkpoints is to imagine it as a changelog
with some aggregation. What
    +we mean by aggregated is that for example, if the state under key ``K1`` is overwritten
multiple times between two
    +consecutive checkpoints, we will only record the latest state value at the time in the
checkpoint. All previous changes
    +are thereby subsumed.
    +
    +This leads us to the discussion of the potential *disadvantages* of incremental checkpoints
compared to full checkpoints.
    +While we save work in writing checkpoints, we have to do more work in reading the data
from multiple checkpoints on
    +recovery. Furthermore, we can no longer simply delete old checkpoints because new checkpoints
rely upon them and the
    +history of old checkpoints can grow indefinitely over time (like a changelog).
    +
    +At this point, it looks like we didn’t gain too much from incremental checkpoints because
we are again trading between
    +checkpointing overhead and recovery time. Fortunately, there are ways to improve on this
naive approach to recovery. One
    +simple and obvious way to restrict recovery time and the length of the checkpoint history
is to write a full checkpoint
    +from time to time. We can drop all checkpoints prior to the most recent full checkpoint,
and the full checkpoint can
    +serve as a new basis for future incremental checkpoints.
    +
    +Our actual implementation of incremental checkpoints in Flink is more involved and designed
to address a number of
    +different tradeoffs. Our incremental checkpointing restricts the size of the checkpoint
history and therefore never
    +needs take a full checkpoint to keep recovery efficiently! We present more detail about
this in the next section, but
    +the high level idea is to accept a small amount of redundant state writing to incrementally
introduce
    +merged/consolidated replacements for previous checkpoints. For now, you can think about
Flink’s approach as stretching
    +out and distributing the consolidation work over several incremental checkpoints, instead
of doing it all at once in a
    +full checkpoint. Every incremental checkpoint can contribute a share for consolidation.
We also track when old
    +checkpoints data becomes obsolete and then prune the checkpoint history over time.
    +
    +### Incremental Checkpoints in Flink
    +
    +In the previous section, we discussed that incremental checkpointing is mainly about
recording all effective state
    +modifications between checkpoints. This poses certain requirements on the underlying
data structures in the state
    +backend that manages the job’s state. It goes without saying that the data structure
should always provide a decent
    +read-write performance to keep state access swift. At the same time, for incremental
checkpointing, the state backend
    +must be able to efficiently detect and iterate state modifications since the previous
checkpoint.
    --- End diff --
    
    and iterate over state modifications


> Improve and enhance documentation for incremental checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-7449
>                 URL: https://issues.apache.org/jira/browse/FLINK-7449
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>    Affects Versions: 1.4.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Minor
>
> We should provide more details about incremental checkpoints in the documentation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message