Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4580200B54 for ; Thu, 28 Jul 2016 19:57:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2CCF160A56; Thu, 28 Jul 2016 17:57:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CA3CD160A85 for ; Thu, 28 Jul 2016 19:57:43 +0200 (CEST) Received: (qmail 48565 invoked by uid 500); 28 Jul 2016 17:57:42 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 48555 invoked by uid 99); 28 Jul 2016 17:57:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jul 2016 17:57:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 75247C7AD2 for ; Thu, 28 Jul 2016 17:57:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.198 X-Spam-Level: * X-Spam-Status: No, score=1.198 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1qVieBsNeqK4 for ; Thu, 28 Jul 2016 17:57:40 +0000 (UTC) Received: from mail-io0-f181.google.com (mail-io0-f181.google.com [209.85.223.181]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 795155F19B for ; Thu, 28 Jul 2016 17:57:40 +0000 (UTC) Received: by mail-io0-f181.google.com with SMTP id b62so106689472iod.3 for ; Thu, 28 Jul 2016 10:57:40 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=j6O5FXS3RcYKmYrc4MNyBgQB7IHBaeiqEr4nM+Yi3Ok=; b=GBm50He/sXTbe15crw2w1jPPPE8vGfaopJltnzvMndj0+EF2iITKYcM4RgGk5bbcSy 3Svl/Atl8gXA8bXqrovWIL3UiEmATHVyTKy1GXW/LhSoDc0iDmXI726yvtFd7qQYQNV3 w6J5DtdlEuJfkyg5Is0RmeLqzdi77/p+iwPxC/yINHCP/NOPV9LWuBqfXolLjrjmWWB5 QwtmTQTuQWfIjctsOlr6who6wpA5/js12EAJM6jE5KLnoQfguFRshfG1nY+GGzD1QtnM SyqgYY+ZGol/1n6Fr3H/mO0MXvHs4qGC2URRQCKrGANBjrAgFg3SFB4wzP0ZOJQ2B0xk wcEg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=j6O5FXS3RcYKmYrc4MNyBgQB7IHBaeiqEr4nM+Yi3Ok=; b=QVOz8lEiO/kDlVknyM76Yw4F4cDK0cDvNdnbc2kv37jIkTwdVMklU1gkvLLKYKpdTt /wGbVu+4+73Exg9AaQX9SNgN+l/vOcHEZU6BQI26obDWnKMnoPkDJI2rmmjy0GU+NR+1 FwAMODUPfliGAsJUGvkMYHtqkQoSFEtZP2W0JwNAzJEaiHe+TIFNdgiMXBcH09S4IFte 9Ed/VXo+PdN+HU+hcYryjrtptUQuuIAHvHE80FaJYE+RpL68kR3pUIkW7HI0dR5OiiB9 PJvB1asp3TxKrxL7A7b84BMvnoBMm9eFcvvnOFQY3iuIvJuf/7XPdf00mn9N/QvK/cZw rmrQ== X-Gm-Message-State: AEkooutcRpfaMgzayPeipL8q0DDtvmyW0e0fDbalyUOVMw+uk6rNLEpfRwnlZu6qrdEGmPFpLWOOfkT2oqo9lA== X-Received: by 10.107.7.94 with SMTP id 91mr46599321ioh.43.1469728659735; Thu, 28 Jul 2016 10:57:39 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.11.195 with HTTP; Thu, 28 Jul 2016 10:57:39 -0700 (PDT) In-Reply-To: References: From: Jason Brelloch Date: Thu, 28 Jul 2016 13:57:39 -0400 Message-ID: Subject: Re: Reprocessing data in Flink / rebuilding Flink state To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113ea420ae0a750538b5de61 archived-at: Thu, 28 Jul 2016 17:57:45 -0000 --001a113ea420ae0a750538b5de61 Content-Type: text/plain; charset=UTF-8 Hey Josh, The way we replay historical data is we have a second Flink job that listens to the same live stream, and stores every single event in Google Cloud Storage. When the main Flink job that is processing the live stream gets a request for a specific data set that it has not been processing yet, it sends a request to the historical flink job for the old data. The live job then starts storing relevant events from the live stream in state. It continues storing the live events until all the events form the historical job have been processed, then it processes the stored events, and finally starts processing the live stream again. As long as it's properly keyed (we key on the specific data set) then it doesn't block anything, keeps everything ordered, and eventually catches up. It also allows us to completely blow away state and rebuild it from scratch. So in you case it looks like what you could do is send a request to the "historical" job whenever you get a item that you don't yet have the current state of. The potential problems you may have are that it may not be possible to store every single historical event, and that you need to make sure there is enough memory to handle the ever increasing state size while the historical events are being replayed (and make sure to clear the state when it is done). It's a little complicated, and pretty expensive, but it works. Let me know if something doesn't make sense. On Thu, Jul 28, 2016 at 1:14 PM, Josh wrote: > Hi all, > > I was wondering what approaches people usually take with reprocessing data > with Flink - specifically the case where you want to upgrade a Flink job, > and make it reprocess historical data before continuing to process a live > stream. > > I'm wondering if we can do something similar to the 'simple rewind' or > 'parallel rewind' which Samza uses to solve this problem, discussed here: > https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html > > Having used Flink over the past couple of months, the main issue I've had > involves Flink's internal state - from my experience it seems it is easy to > break the state when upgrading a job, or when changing the parallelism of > operators, plus there's no easy way to view/access an internal key-value > state from outside Flink. > > For an example of what I mean, consider a Flink job which consumes a > stream of 'updates' to items, and maintains a key-value store of items > within Flink's internal state (e.g. in RocksDB). The job also writes the > updated items to a Kafka topic: > > http://oi64.tinypic.com/34q5opf.jpg > > My worry with this is that the state in RocksDB could be lost or become > incompatible with an updated version of the job. If this happens, we need > to be able to rebuild Flink's internal key-value store in RocksDB. So I'd > like to be able to do something like this (which I believe is the Samza > solution): > > http://oi67.tinypic.com/219ri95.jpg > > Has anyone done something like this already with Flink? If so are there > any examples of how to do this replay & switchover (rebuild state by > consuming from a historical log, then switch over to processing the live > stream)? > > Thanks for any insights, > Josh > > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 Subscribe to the BetterCloud Monitor - Get IT delivered to your inbox --001a113ea420ae0a750538b5de61 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hey Josh,

The way we replay historical = data is we have a second Flink job that listens to the same live stream, an= d stores every single event in Google Cloud Storage. =C2=A0

<= /div>
When the main Flink job that is processing the live stream gets a= request for a specific data set that it has not been processing yet, it se= nds a request to the historical flink job for the old data.=C2=A0 The live = job then starts storing relevant events from the live stream in state.=C2= =A0 It continues storing the live events until all the events form the hist= orical job have been processed, then it processes the stored events, and fi= nally starts processing the live stream again.

As = long as it's properly keyed (we key on the specific data set) then it d= oesn't block anything, keeps everything ordered, and eventually catches= up.=C2=A0 It also allows us to completely blow away state and rebuild it f= rom scratch.

So in you case it looks like what you= could do is send a request to the "historical" job whenever you = get a item that you don't yet have the current state of.=C2=A0

The potential problems you may have are that it may not be= possible to store every single historical event, and that you need to make= sure there is enough memory to handle the ever increasing state size while= the historical events are being replayed (and make sure to clear the state= when it is done).

It's a little complicated, = and pretty expensive, but it works.=C2=A0 Let me know if something doesn= 9;t make sense.


On Thu, Jul 28, 2016 at 1:14 PM, Josh = <jofo90@gmail.com<= /a>> wrote:
Hi= all,

I was wondering what approaches people usually tak= e with reprocessing data with Flink - specifically the case where you want = to upgrade a Flink job, and make it reprocess historical data before contin= uing to process a live stream.


Having used Flink= over the past couple of months, the main issue I've had involves Flink= 's internal state - from my experience it seems it is easy to break the= state when upgrading a job, or when changing the parallelism of operators,= plus there's no easy way to view/access an internal key-value state fr= om outside Flink.=C2=A0

For an example of what I m= ean, consider a Flink job which consumes a stream of 'updates' to i= tems, and maintains a key-value store of items within Flink's internal = state (e.g. in RocksDB). The job also writes the updated items to a Kafka t= opic:


My worry with this is that the state in RocksDB could be lo= st or become incompatible with an updated version of the job. If this happe= ns, we need to be able to rebuild Flink's internal key-value store in R= ocksDB. So I'd like to be able to do something like this (which I belie= ve is the Samza solution):


Has anyone done something like this al= ready with Flink? If so are there any examples of how to do this replay &am= p; switchover (rebuild state by consuming from a historical log, then switc= h over to processing the live stream)?

Thanks for = any insights,
Josh




--
Jason Brelloch=C2=A0| Product Developer
3405 Pied= mont Rd. NE, Suite 325, Atlanta, GA 30305=C2=A0
3D""
Subscribe to = the BetterCloud Monitor=C2=A0- Get IT delivered to your inbox
--001a113ea420ae0a750538b5de61--