Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE3EF200CF7 for ; Tue, 19 Sep 2017 15:34:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BD1261609E0; Tue, 19 Sep 2017 13:34:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B47041609E2 for ; Tue, 19 Sep 2017 15:34:10 +0200 (CEST) Received: (qmail 41692 invoked by uid 500); 19 Sep 2017 13:34:09 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 41683 invoked by uid 99); 19 Sep 2017 13:34:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 13:34:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 61B38CEC17 for ; Tue, 19 Sep 2017 13:34:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.201 X-Spam-Level: X-Spam-Status: No, score=-99.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hmQzPj_yXN-n for ; Tue, 19 Sep 2017 13:34:08 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id EE2D361262 for ; Tue, 19 Sep 2017 13:34:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 052B4E0E33 for ; Tue, 19 Sep 2017 13:34:05 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5CA0D2452F for ; Tue, 19 Sep 2017 13:34:04 +0000 (UTC) Date: Tue, 19 Sep 2017 13:34:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 19 Sep 2017 13:34:11 -0000 [ https://issues.apache.org/jira/browse/FLINK-7449?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1617= 1689#comment-16171689 ]=20 ASF GitHub Bot commented on FLINK-7449: --------------------------------------- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r139687055 =20 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in= comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental = checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-cont= ained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports increm= ental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is s= elf-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, an= d old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for F= link jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` = with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend =3D + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink=E2=80=99s fault tolerance mec= hanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system= can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.= html).=C2=A0 + +Flink creates checkpoints periodically to track the progress of a job = so that, in case of failure, only those +(hopefully few) *events that have been processed after the last comple= ted checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications= for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduc= e *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the ti= me that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *= complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (= typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data = for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the nor= mal load from pipeline=E2=80=99s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tra= deoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflictin= g goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will b= riefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observat= ion that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state b= etween two checkpoints is not drastically +different, and only a fraction of the state data is modified and some = new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in = state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current che= ckpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, t= hat each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in co= mparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to= ``CP 2``, a full checkpoint is simply a copy of the whole +state. + +

+ 3D"Figure +

+ +With incremental checkpointing, each checkpoint contains only the stat= e change since the previous checkpoint. + +* For the first checkpoint ``CP 1``, there is no difference between a = full checkpoint and the complete state at the time the +checkpoint is written. + +* For ``CP 2``, incremental checkpointing will write only the changes = since ``CP 1``: the value for ``K1`` has changed and a mapping +for ``K3`` was added. + +* For checkpoint ``CP 3``, incremental checkpointing only records the = changes since ``CP 2``. + +Notice that, unlike in full checkpoints, we also must record changes t= hat delete state in an incremental checkpoint, as +in the case of ``K0``. In this simple example, we can see how incremen= tal checkpointing can reduce the amount of data that +is written for each checkpoint. + +The next interesting question: how does restoring from incremental che= ckpoints compare to restoring from full +checkpoints? Restoring a full checkpoint is as simple as loading all t= he data from the checkpoint back into the job +state because full checkpoints are self-contained. In contrast, to res= tore an incremental checkpoint, we need to replay +the history of all incremental checkpoints that are in the reference c= hain of the checkpoint we are trying to restore. +In our example from Figure 1, those connections are represented by the= orange arrows. If we want to restore ``CP 3``, as a +first step, we need to apply all the changes of ``CP 1`` to the empty = initial job state. On top of that, we apply the +changes from ``CP 2``, and then the changes from ``CP 3``. + +A different way to think about basic incremental checkpoints is to ima= gine it as a changelog with some aggregation. What +we mean by aggregated is that for example, if the state under key ``K1= `` is overwritten multiple times between two +consecutive checkpoints, we will only record the latest state value at= the time in the checkpoint. All previous changes +are thereby subsumed. + +This leads us to the discussion of the potential *disadvantages* of in= cremental checkpoints compared to full checkpoints. +While we save work in writing checkpoints, we have to do more work in = reading the data from multiple checkpoints on +recovery. Furthermore, we can no longer simply delete old checkpoints = because new checkpoints rely upon them and the +history of old checkpoints can grow indefinitely over time (like a cha= ngelog). + +At this point, it looks like we didn=E2=80=99t gain too much from incr= emental checkpoints because we are again trading between +checkpointing overhead and recovery time. Fortunately, there are ways = to improve on this naive approach to recovery. One +simple and obvious way to restrict recovery time and the length of the= checkpoint history is to write a full checkpoint +from time to time. We can drop all checkpoints prior to the most recen= t full checkpoint, and the full checkpoint can +serve as a new basis for future incremental checkpoints. + +Our actual implementation of incremental checkpoints in Flink is more = involved and designed to address a number of +different tradeoffs. Our incremental checkpointing restricts the size = of the checkpoint history and therefore never +needs take a full checkpoint to keep recovery efficiently! We present = more detail about this in the next section, but +the high level idea is to accept a small amount of redundant state wri= ting to incrementally introduce +merged/consolidated replacements for previous checkpoints. For now, yo= u can think about Flink=E2=80=99s approach as stretching +out and distributing the consolidation work over several incremental c= heckpoints, instead of doing it all at once in a +full checkpoint. Every incremental checkpoint can contribute a share f= or consolidation. We also track when old +checkpoints data becomes obsolete and then prune the checkpoint histor= y over time. + +### Incremental Checkpoints in Flink + +In the previous section, we discussed that incremental checkpointing i= s mainly about recording all effective state +modifications between checkpoints. This poses certain requirements on = the underlying data structures in the state +backend that manages the job=E2=80=99s state. It goes without saying t= hat the data structure should always provide a decent +read-write performance to keep state access swift. At the same time, f= or incremental checkpointing, the state backend +must be able to efficiently detect and iterate state modifications sin= ce the previous checkpoint. --- End diff -- =20 and iterate over state modifications > Improve and enhance documentation for incremental checkpoints > ------------------------------------------------------------- > > Key: FLINK-7449 > URL: https://issues.apache.org/jira/browse/FLINK-7449 > Project: Flink > Issue Type: Improvement > Components: Documentation > Affects Versions: 1.4.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Minor > > We should provide more details about incremental checkpoints in the docum= entation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)