From user-return-35302-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue May 26 16:37:04 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9D4F8180608 for ; Tue, 26 May 2020 18:37:03 +0200 (CEST) Received: (qmail 53480 invoked by uid 500); 26 May 2020 16:37:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 53470 invoked by uid 99); 26 May 2020 16:37:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2020 16:37:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 20A3C182A5B for ; Tue, 26 May 2020 16:37:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.25 X-Spam-Level: X-Spam-Status: No, score=0.25 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mmSZYQzIZZsA for ; Tue, 26 May 2020 16:36:59 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.222.194; helo=mail-qk1-f194.google.com; envelope-from=stevenz3wu@gmail.com; receiver= Received: from mail-qk1-f194.google.com (mail-qk1-f194.google.com [209.85.222.194]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 1DC31BB8CA for ; Tue, 26 May 2020 16:36:59 +0000 (UTC) Received: by mail-qk1-f194.google.com with SMTP id v79so11155144qkb.10 for ; Tue, 26 May 2020 09:36:59 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=SEOKOsIO/rhA1WgdfB0uN/pmqslsa8tJbDBLWcJWNME=; b=lt0172Bj6nH9v2rY33vcGTFb2gNzdBOBqdOZpuAvwsNoV/iKxiZlLBqy/3o7KSBvQZ ZqVbWen3h2skIKyfUwR15PV3NVGMmcSDqoDyykaT3Yh9ed48oYODJLvjEWSHwQk5SYMI sddwNMg8NCZn8pR6i/EY7rXGab14HJj4CxHno9fBKs8Ss5m3LKjw+5vl3FOn6f6uHVs0 oZiNJzl7xW7dfRJS6I0n/R/i37c+olbM48JXjDUbrBeKCs9HZR0ZezE7Z3O2+UbGu+eb 9qCCNI9lOhDcVNaWQ2TvWHepfyJCt9pZAOJJgCkNpyL4xuQ0QOCYsh9c1prQrJsQxSuh FBtw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=SEOKOsIO/rhA1WgdfB0uN/pmqslsa8tJbDBLWcJWNME=; b=Y7VRHdxOV7qtFwHL+iKTx8QKlMRe8Qt85OghPAHJlSB7Ghpra7sCCj+oDxbu0ZrRRa v7rE18/UAtP5eH8sJcFHp8uqKbVth0XIcjsLjb1suBxwJm3t2EbNeKP/1AmL8wgSlpcQ OvKzT7kf1cznvjOzOSI2y7mXp9ygqa0B401CMhGgGcuGJ32pdaleswQyxGCD2rQl/4WA dQnIxp8055M1MuS4H/2p8Xdx1w55CgIarCG+5W6e5Z9kkj0Du5ZeKR4FM4BamE2kqsKN MMMManWKF82nUcDipVoX+hkponSzADkBXxkuUm+mk3osDTjUS6u6zWUd4InNL049ulra CGog== X-Gm-Message-State: AOAM530SNjoMsBiVDvxj1uPIFv+wreSVVoVcNuIIOId7zqMc4AGS/F/2 S8G6q+LL2MOw6MfdveLQlD2g/oABym28K8HHtW4= X-Google-Smtp-Source: ABdhPJzB/FwuQouEdwbHwWpQDMdqtIneSu95xWkkVaXSF8lpk0k7qPoTVazMG0Xh544RLeOAdN7caQB2vjD4KkhAVLY= X-Received: by 2002:a37:aa44:: with SMTP id t65mr2284466qke.81.1590511018643; Tue, 26 May 2020 09:36:58 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Steven Wu Date: Tue, 26 May 2020 09:36:47 -0700 Message-ID: Subject: Re: RocksDB savepoint recovery performance improvements To: Joey Pereira Cc: "user@flink.apache.org" , Yun Tang , Mike Mintz , Shahid Chohan , Aaron Levin Content-Type: multipart/alternative; boundary="0000000000004713c305a68fb483" --0000000000004713c305a68fb483 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Yun, you mentioned that checkpoint also supports rescale. I thought the recommendation [1] is to use savepoint for rescale. [1] https://www.ververica.com/blog/differences-between-savepoints-and-checkpoin= ts-in-flink On Tue, May 26, 2020 at 6:46 AM Joey Pereira wrote: > Following up: I've put together the implementation, > https://github.com/apache/flink/pull/12345. It's passing tests but is > only partially complete, as it still needs some clean-up and configuratio= n. > I still need to try running this against a production cluster to check th= e > performance, as well as getting some RocksDB benchmarks. > > On Mon, May 18, 2020 at 3:46 PM Joey Pereira wrote: > >> Thanks Yun for highlighting this, it's very helpful! I'll give it a go >> with that in mind. >> >> We have already begun using checkpoints for recovery. Having these >> improvements would still be immensely helpful to reduce downtime for >> savepoint recovery. >> >> On Mon, May 18, 2020 at 3:14 PM Yun Tang wrote: >> >>> Hi Joey >>> >>> Previously, I also looked at the mechanism to create on-disk SSTables a= s >>> I planed to use RocksDB's benchmark to mock scenario in Flink. However,= I >>> found the main challenge is how to ensure the keys are inserted in a >>> strictly increasing order. The key order in java could differ from the >>> bytes order in RocksDB. In your case, I think it could be much easier a= s >>> RocksFullSnapshotStrategy write data per columnfamily per key group whi= ch >>> should be in a strictly increasing order [1]. >>> >>> FLINK-17288 could >>> mitigate the performance and your solution could help improve the >>> performance much better (and could integrate with state-processor-api >>> story). >>> >>> On the other hand, for out-of-box to use in production for your >>> scenario, how about using checkpoint to recover, as it also supports >>> rescale and normal recover. >>> >>> [1] >>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd13= 41d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/ap= ache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#= L308 >>> >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Joey Pereira >>> *Sent:* Tuesday, May 19, 2020 2:27 >>> *To:* user@flink.apache.org >>> *Cc:* Mike Mintz ; Shahid Chohan < >>> chohan@stripe.com>; Aaron Levin >>> *Subject:* RocksDB savepoint recovery performance improvements >>> >>> Hey, >>> >>> While running a Flink application with a large-state, savepoint recover= y >>> has been a painful part of operating the application because recovery t= ime >>> can be several hours. During some profiling that chohan (cc'd) had done= , a >>> red flag stood out =E2=80=94 savepoint recovery consisted mostly of Roc= ksDB Get and >>> Put operations. >>> >>> When Flink is bootstrapping state for RocksDB instances this is not wha= t >>> I would have expected, as RocksDB supports direct ingestion of the on-d= isk >>> format (SSTables): >>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-fil= es. This >>> was also recently reported on Jira: >>> https://issues.apache.org/jira/browse/FLINK-17288. >>> >>> From what I understood of the current implementation: >>> >>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and Ro= cksDBIncrementalRestoreOperation, >>> use RocksDBWriteBatchWrapper. >>> >>> * RocksDBWriteBatchWrapper is using RocksDB=E2=80=99s WriteBatch operat= or. This >>> will provide atomicity of batches as well as performance benefits for >>> batching, compared to individual Puts, but it will still involve RocksD= B=E2=80=99s >>> insert paths which can involve expensive operations[0]. >>> >>> Instead, by creating SSTable files and instructing RocksDB to ingest th= e >>> files, writes can be batched even further and avoid expensive operation= s in >>> RocksDB. This is commonly utilized by other systems for restoration or >>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3]. = There >>> are some restrictions on being able to generate SSTables, as well as >>> limitations for ingestion to be performant. Unfortunately, it=E2=80=99s= all not >>> very well documented: >>> >>> 1. When generating an SSTable, keys need to be inserted in-order. >>> >>> 2. Ingested files should not have key-ranges that overlap with either >>> existing or other ingested files[4]. It is possible to ingest overlappi= ng >>> SSTables, but this may incur significant overhead. >>> >>> To generate SSTables with non-overlapping key-ranges and to create them >>> with keys in-order, it would mean that the savepoints would need to be >>> ordered while processing them. I'm unsure if this is the case for how >>> Flink's savepoints are stored. >>> >>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it >>> is used (eg: for incremental checkpoint or something else). I did >>> notice it is iterating over a temporary RocksDB instance and inserting = into >>> a "final=E2=80=9D instance. These writes could be optimized in a simila= r >>> manner. Alternatively, it could be possible to use the temporary instan= ce's >>> SSTables, ingest them, and prune data out with RocksDB's DeleteRange. >>> >>> To get started with prototyping, I was thinking of taking a simple >>> approach of making an interface for RocksDBWriteBatchWrapper and swappi= ng >>> the implementation for one that does SSTable generation and ingestion. = I >>> reckon that will be an easy way to sanity check whether it works at all= . >>> >>> I was also planning on writing some benchmarks in RocksDB to understand >>> the difference for ingestion scenarios, as RocksDB itself is sparse on >>> details about SSTable ingestion[4] and does not have benchmarking for >>> ingestion. >>> >>> Does all of that seem sound? I'll report back when I get time to work >>> out that implementation and tests, likely during the coming weekend. >>> >>> >>> Joey >>> >>> --- >>> >>> [0]: I don=E2=80=99t have any specific sources on this. At a high-level= , some of >>> the operations happening during writes include writing to the memtable >>> before flushing to an SSTable and doing merging and/or compaction. In >>> general, these will add write-amplification and overall overhead to bul= k >>> insertion. These can largely be avoided by giving RocksDB SSTables, >>> especially if they have non-overlapping key-ranges. "Characterizing, >>> Modeling, and Benchmarking RocksDB Key-Value Workloads at Facebook" ( >>> https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a >>> helpful source that highlights what happens during various workloads. >>> >>> >>> [1]: CockroachDB is a database that uses RocksDB as the on-disk storage= . >>> Their implementation consolidates bulk ingestion to an AddSSTable >>> command. Primarily, they have some choice of options for SSTable genera= tion >>> and ingestion that are of interest: >>> >>> * SSTable generation: >>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5= a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966 >>> >>> * SSTable ingestion: >>> https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5= a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917 >>> >>> >>> [2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB >>> as the on-disk storage. Their implementation of bulk ingestion is conta= ined >>> within: >>> https://github.com/tikv/tikv/blob/master/components/sst_importer/src/ss= t_importer.rs >>> >>> Other useful references: >>> - https://github.com/tikv/tikv/issues/2404, discussing performance for >>> copy vs. move options. >>> >>> >>> [3]: Percona is a SQL database which supports a RocksDB backend. Their >>> implementation of ingestion can be found here: >>> https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed349= 93cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815 >>> >>> >>> [4]: Again, there is not a lot of official resources on this. Notable >>> references I found on this include: >>> >>> * https://github.com/facebook/rocksdb/issues/2473, which describes at a >>> high-level how re-insertions work. >>> >>> * https://github.com/facebook/rocksdb/issues/3540, which describes the >>> performance costs for ingesting overlapping SSTables, and specific >>> benchmarks (post-fix) here: >>> https://github.com/facebook/rocksdb/pull/3564 >>> >>> * https://github.com/facebook/rocksdb/pull/3179, which describes the >>> mechanism for ingesting SSTable files: there need to be point-key overl= ap >>> checks for the LSM. >>> >>> * https://github.com/facebook/rocksdb/issues/5133, indicates >>> re-ingesting the same SSTable (due to restarts in import processes), >>> can cause issues for a particular set of options. >>> >>> * https://github.com/facebook/rocksdb/issues/5770#issuecomment-52848824= 5, >>> indicates compaction occurs more (or, only) when overlapping SSTables >>> are ingested. The thinking here is non-overlapping SSTable ingestion me= ans >>> very few operations (compaction, merging, etc) occur afterward, with >>> the right tuning for generation and ingestion. >>> >>> * https://github.com/facebook/rocksdb/issues/5010, which discusses some >>> unresolved issues for high CPU overhead on ingestion. >>> >> --0000000000004713c305a68fb483 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Yun, you mentioned that checkpoint also supports rescale. = I thought the recommendation [1] is to use savepoint for=C2=A0rescale.=C2= =A0


On Tue, May 26, 2020 at 6:46 AM Joey Pereira <joey@stripe.com> wrote:
Following up: I've put toget= her the implementation,=C2=A0https://github.com/apache/flink/pull/12345. = It's passing tests but is only=C2=A0partially=C2=A0complete, as it stil= l needs some clean-up and configuration. I still need to try running this a= gainst a production cluster to check the performance, as well as getting so= me RocksDB benchmarks.

On Mon, May 18, 2020 at 3:46 PM Joey Pereira <joey@stripe.com> wr= ote:
Thanks=C2=A0Yun for highlighting this, it's very helpful! I'l= l give it a go with that in mind.

We have already begun = using checkpoints for recovery. Having these improvements would still be im= mensely helpful to reduce downtime for savepoint recovery.

<= div class=3D"gmail_quote">
On Mon, May= 18, 2020 at 3:14 PM Yun Tang <myasuka@live.com> wrote:
Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I = planed to use RocksDB's benchmark to mock scenario in Flink. However, I= found the main challenge is how to ensure the keys are inserted in a stric= tly increasing order. The key order in java could differ from the bytes order in RocksDB. In your case, I thin= k it could be much easier as RocksFullSnapshotStrategy write data per colum= nfamily per key group which should be in a strictly increasing order [1].

FLINK-17288 coul= d mitigate the performance and your solution could help improve the perform= ance much better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, h= ow about using checkpoint to recover, as it also supports rescale and norma= l recover.



Best
Yun Tang

From: Joey Pereira &= lt;joey@stripe.com= >
Sent: Tuesday, May 19, 2020 2:27
To: user@= flink.apache.org <user@flink.apache.org>
Cc: Mike Mintz <mikemintz@stripe.com>; Shahid Chohan <chohan@stripe.com>; Aaron Levin= <aaronlevin@= stripe.com>
Subject: RocksDB savepoint recovery performance improvements
=C2=A0
Hey,

While running a Flink application with a large-state, savepoint = recovery has been a painful part of operating the application because recov= ery time can be several hours. During some profiling that chohan (cc'd)= had done,=C2=A0a red flag stood out =E2=80=94 savepoint recovery consisted mostly of RocksDB Get and Put o= perations.

When Flink is bootstrapping state for RocksDB instances this is not wh= at I would have expected, as RocksDB supports direct ingestion of the on-di= sk format (SSTables): https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files.=C2=A0This was also recently reported on Jira: https://issues.apache.org/jira/browse/FLINK-17288.

From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation= =C2=A0and=C2=A0RocksDBIncrementalRestoreOperation, use=C2=A0Ro= cksDBWriteBatchWrapper.

*=C2=A0RocksDBWriteBatchWrapper=C2=A0is using Rocks= DB=E2=80=99s WriteBatch=C2=A0operator. This will=C2=A0provide = atomicity of batches as well as performance benefits for batching, compared= to individual Puts, but it will still involve RocksDB=E2=80=99s insert paths which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to in= gest the files, writes can be batched even further and avoid expensive oper= ations in RocksDB. This is commonly utilized by other systems for restorati= on or import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].=C2=A0=C2=A0There are so= me restrictions on being able to generate SSTables, as well as limitations = for ingestion to be performant. Unfortunately, it=E2=80=99s all not very we= ll documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either = existing or other ingested files[4]. It is possible to ingest overlapping S= STables, but this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to crea= te them with keys in-order, it would mean that the savepoints would need to= be ordered while processing them. I'm unsure if this is the case for h= ow Flink's savepoints are stored.=C2=A0

I have not dug into=C2=A0RocksDBIncrementalRestoreOperation=C2=A0yet, or how it is used=C2=A0(eg:=C2= =A0for incremental checkpoint or something else). I did notice= it is iterating over a temporary RocksDB instance and inserting into a "final=E2=80=9D instance. = These writes could be optimized in a similar manner. Alternatively, it coul= d be possible to use the temporary instance's SSTables, ingest them, an= d prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simp= le approach of making an interface for RocksDBWriteBatchWrapper and swappin= g the implementation for one that does SSTable generation and ingestion. I = reckon that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understan= d the difference for ingestion scenarios, as RocksDB itself is sparse on de= tails about SSTable ingestion[4] and does not have benchmarking for ingesti= on.

Does all of that seem sound? I'll report back when I get tim= e to work out that implementation and tests, likely during the coming weeke= nd.


Joey

---

[0]: I don=E2=80=99t have any specific sources on this. At a hig= h-level, some of the operations happening during writes include writing to = the memtable before flushing to an SSTable and doing merging and/or compact= ion. In general, these will add write-amplification and overall overhead to bulk insertion. These can largely be avoided by gi= ving RocksDB SSTables, especially if they have non-overlapping key-ranges.= =C2=A0=C2=A0"Characterizing, Modeling, and Benchmarking R= ocksDB Key-Value Workloads at Facebook" (https://= www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful source that highlights what happens during various workloads.=


[1]: CockroachDB is a database that uses RocksDB as the on-disk = storage. Their implementation consolidates bulk ingestion to an AddSST= able command. Primarily, they have some choice of optio= ns for SSTable generation and ingestion that are of interest:




[2]: TiDB, and TiKV which is the KV layer of the database, uses = RocksDB as the on-disk storage. Their implementation of bulk ingestion is c= ontained within: =C2=A0https://github.com/tikv/tikv/blob/mas= ter/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.c= om/tikv/tikv/issues/2404, discussing performance for copy = vs. move options.


[3]: Percona is a SQL database which supports a RocksDB backend.= Their implementation of ingestion can be found here: https://github.com/pe= rcona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/= rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. No= table references I found on this include:

*=C2=A0https://github.com/fac= ebook/rocksdb/issues/2473, which describes at a high-level= how re-insertions work.=C2=A0

*=C2=A0https://github.com/fac= ebook/rocksdb/issues/3540, which describes the performance= costs for ingesting overlapping SSTables, and specific benchmarks (post-fix) = here: https://github.com/facebook/roc= ksdb/pull/3564

*=C2=A0https://github.com/face= book/rocksdb/pull/3179, which describes the mechanism for = ingesting SSTable files: there need to be point-key overlap checks for the LSM.

*=C2=A0https://github.com/fac= ebook/rocksdb/issues/5133, indicates re-ingesting the same= SSTable (due to restarts in import processes), can cause = issues for a particular set of options.

*=C2=A0https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245= , indicates compaction occurs more (or, only) when overlapping SS= Tables are ingested. The thinking here is non-overlapping SSTable ingestion= means very few operations (compaction, merging, etc) occur afterward, with = the right tuning for generation and ingestion.

*=C2=A0https://github.com/fac= ebook/rocksdb/issues/5010, which discusses some unresolved= issues for high CPU overhead=C2=A0on ingestion.
--0000000000004713c305a68fb483--