flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alpinegizmo <...@git.apache.org>
Subject [GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Date Tue, 19 Sep 2017 13:33:21 GMT
Github user alpinegizmo commented on a diff in the pull request:

    --- Diff: docs/ops/state/checkpoints.md ---
    @@ -99,3 +99,296 @@ above).
     $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
    +## Incremental Checkpoints
    +### Synopsis
    +Incremental checkpoints can significantly reduce checkpointing time in comparison to
full checkpoints, at the cost of a
    +(potentially) longer recovery time. The core idea is that incremental checkpoints only
record changes in state since the
    +previously-completed checkpoint instead of producing a full, self-contained backup of
the state backend. In this way,
    +incremental checkpoints can build upon previous checkpoints.
    +RocksDBStateBackend is currently the only backend that supports incremental checkpoints.
    +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating
over time. As a result, the
    +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints
are eventually subsumed and
    +pruned automatically.
    +``While we strongly encourage the use of incremental checkpoints for Flink jobs with
large state, please note that this is
    +a new feature and currently not enabled by default``.
    +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding
boolean flag in the
    +constructor set to `true`, e.g.:
    +   RocksDBStateBackend backend =
    +       new RocksDBStateBackend(filebackend, true);
    +### Use-case for Incremental Checkpoints
    +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint
represents a consistent
    +snapshot of the distributed state of a Flink job from which the system can recover in
case of a software or machine
    +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). 
    +Flink creates checkpoints periodically to track the progress of a job so that, in case
of failure, only those
    +(hopefully few) *events that have been processed after the last completed checkpoint*
must be reprocessed from the data
    +source. The number of events that must be reprocessed has implications for recovery time,
and so for fastest recovery,
    +we want to *take checkpoints as often as possible*.
    +However, checkpoints are not without performance cost and can introduce *considerable
overhead* to the system. This
    +overhead can lead to lower throughput and higher latency during the time that checkpoints
are created. One reason is
    +that, traditionally, each checkpoint in Flink always represented the *complete state*
of the job at the time of the
    +checkpoint, and all of the state had to be written to stable storage (typically some
distributed file system) for every
    +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint
can obviously create
    +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s
data processing work.
    +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery
time and checkpointing
    +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this
is exactly the problem that
    +incremental checkpoints solve.
    +### Basics of Incremental Checkpoints
    +In this section, for the sake of getting the concept across, we will briefly discuss
the idea behind incremental
    +checkpoints in a simplified manner.
    +Our motivation for incremental checkpointing stemmed from the observation that it is
often wasteful to write the full
    +state of a job for every single checkpoint. In most cases, the state between two checkpoints
is not drastically
    +different, and only a fraction of the state data is modified and some new data added.
Instead of writing the full state
    +into each checkpoint again and again, we could record only changes in state since the
previous checkpoint. As long as we
    +have the previous checkpoint and the state changes for the current checkpoint, we can
restore the full, current state
    +for the job. This is the basic principle of incremental checkpoints, that each checkpoint
can build upon a history of
    +previous checkpoints to avoid writing redundant information.
    +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full
    +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full
checkpoint is simply a copy of the whole
    +<p class="text-center">
    +   <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints" width="80%" src="{{
site.baseurl }}/fig/incremental_cp_basic.svg"/>
    +With incremental checkpointing, each checkpoint contains only the state change since
the previous checkpoint.
    +* For the first checkpoint ``CP 1``, there is no difference between a full checkpoint
and the complete state at the time the
    +checkpoint is written.
    +* For ``CP 2``, incremental checkpointing will write only the changes since ``CP 1``:
the value for ``K1`` has changed and a mapping
    +for ``K3`` was added.
    +* For checkpoint ``CP 3``, incremental checkpointing only records the changes since ``CP
    +Notice that, unlike in full checkpoints, we also must record changes that delete state
in an incremental checkpoint, as
    +in the case of ``K0``. In this simple example, we can see how incremental checkpointing
can reduce the amount of data that
    +is written for each checkpoint.
    +The next interesting question: how does restoring from incremental checkpoints compare
to restoring from full
    +checkpoints? Restoring a full checkpoint is as simple as loading all the data from the
checkpoint back into the job
    +state because full checkpoints are self-contained. In contrast, to restore an incremental
checkpoint, we need to replay
    +the history of all incremental checkpoints that are in the reference chain of the checkpoint
we are trying to restore.
    +In our example from Figure 1, those connections are represented by the orange arrows.
If we want to restore ``CP 3``, as a
    +first step, we need to apply all the changes of ``CP 1`` to the empty initial job state.
On top of that, we apply the
    +changes from ``CP 2``, and then the changes from ``CP 3``.
    +A different way to think about basic incremental checkpoints is to imagine it as a changelog
with some aggregation. What
    +we mean by aggregated is that for example, if the state under key ``K1`` is overwritten
multiple times between two
    +consecutive checkpoints, we will only record the latest state value at the time in the
checkpoint. All previous changes
    +are thereby subsumed.
    +This leads us to the discussion of the potential *disadvantages* of incremental checkpoints
compared to full checkpoints.
    +While we save work in writing checkpoints, we have to do more work in reading the data
from multiple checkpoints on
    +recovery. Furthermore, we can no longer simply delete old checkpoints because new checkpoints
rely upon them and the
    +history of old checkpoints can grow indefinitely over time (like a changelog).
    +At this point, it looks like we didn’t gain too much from incremental checkpoints because
we are again trading between
    +checkpointing overhead and recovery time. Fortunately, there are ways to improve on this
naive approach to recovery. One
    +simple and obvious way to restrict recovery time and the length of the checkpoint history
is to write a full checkpoint
    +from time to time. We can drop all checkpoints prior to the most recent full checkpoint,
and the full checkpoint can
    +serve as a new basis for future incremental checkpoints.
    +Our actual implementation of incremental checkpoints in Flink is more involved and designed
to address a number of
    +different tradeoffs. Our incremental checkpointing restricts the size of the checkpoint
history and therefore never
    +needs take a full checkpoint to keep recovery efficiently! We present more detail about
this in the next section, but
    +the high level idea is to accept a small amount of redundant state writing to incrementally
    +merged/consolidated replacements for previous checkpoints. For now, you can think about
Flink’s approach as stretching
    +out and distributing the consolidation work over several incremental checkpoints, instead
of doing it all at once in a
    +full checkpoint. Every incremental checkpoint can contribute a share for consolidation.
We also track when old
    +checkpoints data becomes obsolete and then prune the checkpoint history over time.
    +### Incremental Checkpoints in Flink
    +In the previous section, we discussed that incremental checkpointing is mainly about
recording all effective state
    +modifications between checkpoints. This poses certain requirements on the underlying
data structures in the state
    +backend that manages the job’s state. It goes without saying that the data structure
should always provide a decent
    +read-write performance to keep state access swift. At the same time, for incremental
checkpointing, the state backend
    +must be able to efficiently detect and iterate state modifications since the previous
    +One data structure that is very well-suited for this use case is the *log-structured-merge
(LSM) tree* that is the core
    +data structure in Flink’s RocksDB-based state backend. Without going into too much
detail, the write path of RocksDB
    +already roughly resembles a changelog with some pre-aggregation — which perfectly fits
the needs of incremental
    +checkpoints. On top of that, RocksDB also has a *compaction mechanism* can be regarded
as an elaborated form of log
    +#### RocksDB Snapshots as a Foundation
    +In a nutshell, *RocksDB is a key-value store based on LSM trees*. The write path of RocksDB
first collects all changes as
    +key-value pairs in a mutable, in-memory buffer called memtable. Writes to the same key
in a memtable can simply replace
    +previous values (this is the pre-aggregation aspect). Once the memtable is full, it is
written to disk completely with
    +all entries sorted by their key. Typically, RocksDB also applies a lightweight compression
(e.g. snappy) in the write
    +process. After the memtable was written to disk, it becomes immutable and is now called
a *sorted-string-table
    +(sstable)*. Figure 2 illustrates these basic RocksDB internals.
    +<p class="text-center">
    +   <img alt="Figure 2: RocksDB architecture (simplified)" width="75%" src="{{ site.baseurl
    +To avoid the problem of collecting an infinite number of sstables over time, a background
task called compaction is
    +constantly merging sstables to consolidate potential duplicate entries for each key from
the merged tables. Once some
    +sstables have been merged, those original sstables become obsolete and are deleted by
RocksDB. The newly created merged
    +sstable contains all their net information. We show an example for such a merge in Figure
3. SSTable-1 and SStable-2
    +contain some duplicate mappings for certain keys, such as key ``9``. The system can apply
a sort-merge strategy in which
    +the newer mappings from ``SSTable-2`` overwrite mappings for keys that also existed in
``SSTable-1``. For key ``7``, we can also
    +see a delete (or antimatter) entry that, when merged, results in omitting key ``7`` in
the merge result. Notice that the
    +merge in RocksDB is typically generalised to a multi-way merge. We won’t go into details
about the read path here,
    +because it is irrelevant for the approach that we want to present. You can find more
details about RocksDB internals in
    +their [documentation](http://rocksdb.org/).
    +<p class="text-center">
    +   <img alt="Figure 3: Merging SSTable files" width="50%" src="{{ site.baseurl }}/fig/sstable_merge.png"/>
    +#### Integrating RocksDB’s Snapshots with Flink’s Checkpoints
    +Flink’s incremental checkpointing logic operates on top of this mechanism that RocksDB
provides. From a high-level
    +perspective, when taking a checkpoint, we track which sstable files have been created
and deleted by RocksDB since the
    +previous checkpoint. This is sufficient for figuring out the effective state changes
because sstables are immutable. Our
    +backend remembers the sstables that already existed in the last completed checkpoint
in order to figure out which files
    +have been created or deleted in the current checkpoint interval. With this in mind, we
will now explain the details of
    +checkpointing state in our RocksDB backend.
    +In the first step, Flink triggers a flush in RocksDB so that all all memtables are forced
into sstables on disk, and all
    +sstables are hard-linked in a local temporary directory. This step of the checkpoint
is synchronous to the processing
    +pipeline, and all further steps are performed asynchronously and will not block processing.
    +Then, all new sstables (w.r.t. the previous checkpoint) are copied to stable storage
(e.g. HDFS) and referenced in the
    +new checkpoint. All sstables that already existed in the previous checkpoint will *not
be copied again to stable
    +storage* but simply re-referenced. Deleted files will simply no longer receive a reference
in the new checkpoint. Notice
    +that deleted sstables in RocksDB are always the result of compaction. This is the way
in which Flink’s incremental
    +checkpoints can prune the checkpoint history. Old sstables are eventually replaced by
the sstable that is the result of
    +merging them. Note that in a strict sense of tracking changes between checkpoints, this
uploading of consolidated tables
    +is redundant work. But it is performed incrementally, typically adding only a small amount
of overhead to some
    +checkpoints. However, we absolutely consider that overhead to be a worthwhile investment
because it allows us to keep a
    +shorter history of checkpoints to consider in a recovery.
    +Another interesting point is how Flink can determine when it is safe to delete a shared
file. Our solution works as
    +follows: for each file, we keep a reference count for each sstable file that we copied
to stable storage. These counts
    +are maintained by the checkpoint coordinator on the job master in a *shared state registry*.
This shared registry tracks
    +the number of checkpoints that reference a shared file in stable storage, e.g. an uploaded
sstable. When a checkpoint is
    +completed, the checkpoint coordinator simply increases the counts for all files that
are referenced in the new
    +checkpoint by 1. If a checkpoint is dropped, the count of all files it has referenced
is decreased by 1. When the count
    +goes down to 0, the shared file is deleted from stable storage because it is no longer
used by any checkpoint.
    +<p class="text-center">
    +   <img alt="Figure 4: Flink incremental checkpointing example" width="100%" src="{{
site.baseurl }}/fig/incremental_cp_impl_example.svg"/>
    +To make this idea a bit more complete, see Figure 4, where we show an example run over
4 incremental checkpoints to make
    +things a bit more concrete. We illustrate what is happening for one subtask (here: subtask
index 1) of one operator
    +(called ``Operator-2``) with keyed state. Furthermore, for this example we assume that
the number of retained
    +checkpoints is configured to 2, so that Flink will always keep the two latest checkpoints
and older checkpoints are
    +pruned. The columns show, for each checkpoint, the state of the local RocksDB instance
(i.e. the current sstable files),
    +the files that are referenced in the checkpoint, and the counts in the shared state registry
after the checkpoint is
    +completed. For checkpoint 1 (``CP 1``)), we can see that the local RocksDB directory
contains two sstable files, which
    +are considered as new and uploaded to stable storage. We upload the files under the checkpoint
directory of the
    +corresponding checkpoint that first uploaded them, in this case ``cp-1``, and use unique
filenames because they could
    +otherwise collide with identical sstable names from other subtasks. When the checkpoint
completes, the two entries are
    +created in the shared state registry, one for each newly uploaded file, and their counts
are set to 1. Notice that the
    +key in the shared state registry is a composite of operator, subtask, and the original
sstable file name. In the actual
    +implementation, the shared state registry also keeps a mapping from the key to the file
path in stable storage besides
    +the count, which is not shown to keep the graphic clearer.
    +At the time of the second checkpoint, two new sstable files have been created by RocksDB
and the two older sstable files
    +from the previous checkpoint also still exist. For checkpoint ``CP 2``, Flink must now
upload the two new files to
    +stable storage and can reference the ``sstable-(1)`` and ``sstable-(2)`` from the previous
checkpoint. We can see that
    +the file references to previously existing sstable files point to existing files in the
``cp-1`` directory and 
    +references to new sstable files point to the newly uploaded files in directory ``cp-2``.
When the checkpoint completes,
    +the counts for all referenced files are increased by 1.
    +For checkpoint ``CP 3``, we see that RocksDB’s compaction has merged ``sstable-(1)``,
``sstable-(2)``, and 
    +``sstable-(3)`` into ``sstable-(1,2,3)``. This merged table contains the same net information
as the source files and
    +eliminates any duplicate entries for each key that might have existed across the three
source files. The source files of
    +the merge have been deleted, ``sstable-(4)`` still exists, and one additional ``sstable-(5)``
was created. For the 
    +checkpoint, we need to upload the new files ``sstable-(1,2,3)`` and ``sstable-(5)`` and
can re-reference ``sstable-(4)``
    +from a previous checkpoint. When this checkpoint completes, two things will happen at
the checkpoint coordinator, in the
    +following order:
    +* First, the checkpoint registers the referenced files, increasing the count of those
files by 1.
    +* Then, the older checkpoint ``CP 1`` will be deleted because we have configured the
number of retained checkpoints to
    +* As part of the deletion, the counts for all files referenced by ``CP 1``, (``sstable-(1)``
and ``sstable-(2)``), is
    +decreased by 1.
    +Even though ``CP 1`` is logically deleted, we can see that all the files it created have
still a reference count greater 
    +0 and we cannot yet physically delete them from stable storage. Our graphic shows the
counts after ``CP 3`` was
    +registered and ``CP 1`` was deleted.
    +For our last checkpoint ``CP 4``, RocksDB has merged ``sstable-(4)``, ``sstable-(5)``,
and another ``sstable-(6)``,
    +which was never observed at the time of a checkpoint, into ``sstable-(4,5,6)``. This
file ``sstable-(4,5,6)`` is new for
    +the checkpoint, and must be uploaded. We reference it together with ``sstable-(1,2,3)``
that was already known in
    +``CP 4``. In the checkpoint coordinator’s shared state registry, the counts for ``sstable-(1,2,3)``
    +``sstable-(4,5,6)`` are increased by 1. Then, ``CP 2`` is deleted as part of our retention
policy. This decreases the
    +counts for ``sstable-(1)``, ``sstable-(2)``, ``sstable-(3)``, and ``sstable-(4)`` by
1. This means that the counts for
    +``sstable-(1)``, ``sstable-(2)``, and ``sstable-(3)`` have now dropped to 0 and they
will be physically deleted from the
    +stable storage. The final counts for ``CP 4`` after this step  are shown in the figure.
This concludes our example for a
    +sequence of 4 incremental checkpoints.
    +#### Resolving Races for Concurrent Checkpoints
    +We sometimes also have to resolve race conditions between concurrent checkpoints in incremental
checkpointing. Flink can
    +execute multiple checkpoints in parallel, and new checkpoints can start before previous
checkpoints are confirmed as
    +completed by the checkpoint coordinator to the backend. We need to consider this in our
reasoning about which previous
    +checkpoint can serve as a basis for a new incremental checkpoint. We are only allowed
to reference shared state from a
    +confirmed checkpoint, because otherwise we might attempt to reference a shared file that
might still be deleted, e.g.
    +when the assumed predecessor checkpoint still fails.
    +This can lead to a situation were multiple checkpoints regard the same sstable files
in RocksDB as new because no
    +checkpoint that attempted to upload and register those sstable files has been confirmed,
yet. To be on the safe side,
    +checkpoints must always upload such files to stable storage independently, under unique
names, until the sstable files
    +have been registered by a completed checkpoint and the confirmation reached the backend.
Otherwise, pending previous
    +checkpoints might still fail, in which case their newly uploaded files are deleted, and
future checkpoints would
    +potentially attempt to reference deleted data.
    +Sometimes, this upload policy will result in the same sstable file been uploaded more
than once, from different
    +checkpoints. However, at least we can later de-duplicate the sstable files in the checkpoint
coordinator because they
    +are accounted under the same key. Only the copy that was uploaded by the first-confirmed
checkpoint survives and we can
    +replace reference to the duplicates in all checkpoints that register afterwards.
    +#### Recovering the Shared State Registry under Job Manager Failure
    +During recovery from a job manager failure, the shared state registry counts are simply
recalculated from the completed
    +checkpoint store. We clear all counts and re-register all checkpoints contained in the
checkpoints from the completed
    +checkpoint store to the registry.
    +### Known Limitations of Incremental Checkpointing
    +Incremental checkpoints are only available for checkpoints and not for savepoints. Savepoints
are always self-contained
    +and record the full state of a job. However, it is possible to externalize incremental
checkpoints. This is a way to use
    +them for manual restarts of a job.
    +Rescaling the parallelism of a job is an operation that is officially only supported
by through savepoints and not from
    +incremental checkpoints. (Unofficially, it should still be possible, though.)
    +Users should not manually delete non-empty checkpoint directories when working with incremental
checkpoints. A newer
    +checkpoint might still reference files from the doctor of an older checkpoint.
    --- End diff --
    doctor -> directory


View raw message