Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0015B200B59 for ; Mon, 8 Aug 2016 11:52:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2EE4160A91; Mon, 8 Aug 2016 09:52:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B78E160A8F for ; Mon, 8 Aug 2016 11:52:12 +0200 (CEST) Received: (qmail 40652 invoked by uid 500); 8 Aug 2016 09:52:12 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 40641 invoked by uid 99); 8 Aug 2016 09:52:12 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Aug 2016 09:52:12 +0000 Received: from mail-wm0-f52.google.com (mail-wm0-f52.google.com [74.125.82.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 964531A0178 for ; Mon, 8 Aug 2016 09:52:11 +0000 (UTC) Received: by mail-wm0-f52.google.com with SMTP id i5so127002043wmg.0 for ; Mon, 08 Aug 2016 02:52:11 -0700 (PDT) X-Gm-Message-State: AEkoouue0A8K3G6fDObm7lkXCF+08S/QeL1t/WoCjIsqI66ZzZq9xJ9YDrGtPiwtMIvv7xKVKbV4JDbMrvtaHw== X-Received: by 10.25.38.149 with SMTP id m143mr27935551lfm.107.1470649929174; Mon, 08 Aug 2016 02:52:09 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Gyula_F=C3=B3ra?= Date: Mon, 08 Aug 2016 09:51:58 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Cassandra statebackend To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a1141166a9e19fe05398c5e8a archived-at: Mon, 08 Aug 2016 09:52:14 -0000 --001a1141166a9e19fe05398c5e8a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, I have done something similar in the past for storing state in sharded MySql databases. We used this for a while for state size scaling reasons but have switched to RocksDB later and therefore this statebackend has been removed from Flink to cut some maintenance costs. You can find the initial PR here that contains the description: https://github.com/apache/flink/pull/1305 Maybe it helps a little, I don't know :) Cheers, Gyula Aljoscha Krettek ezt =C3=ADrta (id=C5=91pont: 2016. a= ug. 8., H, 11:41): > Hi, > thanks for sharing the design doc, these are valuable ideas. > > We might have to revisit the specifics once the re-sharding/key-group > changes are in Flink and once you actually want to start working on this. > > Cheers, > Aljoscha > > On Sat, 6 Aug 2016 at 07:32 Chen Qin wrote: > > > Aljoscha > > > > Sorry about late reply. > > > > David and I drafted a design doc with some diagrams. We may not work on > it > > immediately, but we thought it would be valuable to share our thoughts > and > > hear feedbacks. > > > > > > > https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3Y= R2xLdk/edit#heading=3Dh.12fh7saw98iz > > > > >about sate lineage: > > > > One approach might add pointer to keep data lineage between updated key > in > > first checkpoint and t's restored checkpoint_id correspondent. It assum= e > > restore from a save point will not cause job manager re instrument > already > > used checkpoint id. > > > > >clean up old states, > > > > Since job manager already knew save points and latests successful > > checkpoint. When a save point is created, it could be good time for job > > manager to instrument clean up message and ask each states to move > > effective key/values up to current save point and delete anything befor= e. > > That's doesn't need to be synchronized since both before and after > > compaction will not change states value but location where that value > > stored. Delete a save point / checkpoint can also trigger compaction. > > > > Thanks, > > Chen > > > > On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek > > wrote: > > > > > Hi, > > > thanks for opening the Jira issue. I'll continue the discussion here > > > instead of in the Jira, I hope that's OK. > > > > > > That last paragraph of yours is the most interesting. We will have to > > adapt > > > the way that checkpoints are stored to accommodate state backends tha= t > > > store state in some external system, such as Cassandra. Right now, ea= ch > > > Checkpoint/Savepoint is stored in isolation and the system does not > know > > > about any relation between them. We have to introduce such a relation= , > > > basically putting the checkpoints into a graph structure that shows t= he > > > lineage of the checkpoints. Then, when we are cleaning up old > checkpoints > > > we check the ranges of (logical) timestamps of the checkpoints that w= e > > can > > > remove and instruct the StateBackend to remove the relevant ranges. > > > > > > This leads to another interesting thing. We might need to have a > > > StateBackend component running in the JobManager that we can invoke t= o > > > delete ranges of checkpoints. Right now, a StateBackend only lives on > the > > > TaskManager, in the operators. Cleanup of time ranges, however, shoul= d > > > probably happen in some centralized location. > > > > > > Cheers, > > > Aljoscha > > > > > > On Mon, 25 Jul 2016 at 22:38 Chen Qin wrote: > > > > > > > Hi Aljoscha, > > > > > > > > Cool! I created a JIRA for this. > > > > https://issues.apache.org/jira/browse/FLINK-4266 > > > > Some comments inline. > > > > > > > > Chen > > > > > > > > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek < > aljoscha@apache.org > > > > > > > wrote: > > > > > > > > > Hi, > > > > > I thought there was a Jira for that but I looked and couldn't fin= d > > it. > > > If > > > > > you'd like you can create one and we can discuss the design. Do y= ou > > > have > > > > > any ideas yet? > > > > > > > > > > The tricky things I see in this are: > > > > > - Knowing which data is the current data. This will require some > > kind > > > of > > > > > timestamps or increasing IDs. > > > > > > > > > > > > > =E2=80=8BWe are thinking of leveraging client assigned timestamp fr= om > > > > checkpoint_timestamp. > > > > =E2=80=8B > > > > > > > > > - Knowing when you can retire data from Cassandra > > > > > > > > > =E2=80=8BThat's interesting part, each state checkpoint snapshot mi= ght > > reference > > > > t's previous snapshot=E2=80=8B. Delete/Consolidate rows previous sn= apshot > with > > > > eventual consistency can be tricky. > > > > =E2=80=8B > > > > > > > > > Some of these might require some changes to how Flink handles > > > checkpoints > > > > > and it somewhat goes into the direction of incremental checkpoint= s. > > > That > > > > > last part is especially important once you deal with savepoints, > > which > > > > can > > > > > stay around indefinitely. > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon > wrote: > > > > > > > > > > > Hi Chen, > > > > > > > > > > > > AFAIK, there currently isn=E2=80=99t any FLIP / JIRA / work cur= rently > for a > > > > > > Cassandra state backend. I think it=E2=80=99ll definitely by in= teresting > to > > > > have > > > > > > one in Flink. > > > > > > > > > > > > Regards, > > > > > > Gordon > > > > > > > > > > > > > > > > > > On July 25, 2016 at 10:24:32 AM, Chen Qin (qinnchen@gmail.com) > > > wrote: > > > > > > > > > > > > =E2=80=8BHi there, > > > > > > > > > > > > Is there any design docs or on going efforts there? > > > > > > > > > > > > Thanks, > > > > > > Chen =E2=80=8B > > > > > > > > > > > > > > > > > > > > > --001a1141166a9e19fe05398c5e8a--