Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EC27E2009D9 for ; Thu, 19 May 2016 20:49:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EAA9B160A00; Thu, 19 May 2016 18:49:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E087E1609AE for ; Thu, 19 May 2016 20:49:00 +0200 (CEST) Received: (qmail 94049 invoked by uid 500); 19 May 2016 18:49:00 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 94036 invoked by uid 99); 19 May 2016 18:49:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2016 18:49:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9168FC0F4E for ; Thu, 19 May 2016 18:48:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.011 X-Spam-Level: * X-Spam-Status: No, score=1.011 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, RCVD_IN_DNSWL_MED=-2.3, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 0fl1JpXF1oOd for ; Thu, 19 May 2016 18:48:56 +0000 (UTC) Received: from smtp-4.sys.kth.se (smtp-4.sys.kth.se [130.237.48.193]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 5A02D60E33 for ; Thu, 19 May 2016 18:48:56 +0000 (UTC) Received: from smtp-4.sys.kth.se (localhost.localdomain [127.0.0.1]) by smtp-4.sys.kth.se (Postfix) with ESMTP id 294E5DED; Thu, 19 May 2016 20:48:56 +0200 (CEST) X-Virus-Scanned: by amavisd-new at kth.se Received: from smtp-4.sys.kth.se ([127.0.0.1]) by smtp-4.sys.kth.se (smtp-4.sys.kth.se [127.0.0.1]) (amavisd-new, port 10024) with LMTP id z9-mHQkelh9J; Thu, 19 May 2016 20:48:55 +0200 (CEST) Received: from exdb01.ug.kth.se (unknown [192.168.32.111]) by smtp-4.sys.kth.se (Postfix) with ESMTPS id E0C65CC9; Thu, 19 May 2016 20:48:54 +0200 (CEST) Received: from exdb06.ug.kth.se (192.168.32.116) by exdb01.ug.kth.se (192.168.32.111) with Microsoft SMTP Server (TLS) id 15.0.1104.5; Thu, 19 May 2016 20:48:53 +0200 Received: from exdb01.ug.kth.se (192.168.32.111) by exdb06.ug.kth.se (192.168.32.116) with Microsoft SMTP Server (TLS) id 15.0.1104.5; Thu, 19 May 2016 20:48:53 +0200 Received: from exdb01.ug.kth.se ([192.168.32.111]) by exdb01.ug.kth.se ([192.168.32.111]) with mapi id 15.00.1104.000; Thu, 19 May 2016 20:48:53 +0200 From: Paris Carbone To: Stavros Kontopoulos CC: "user@flink.apache.org" Subject: Re: flink snapshotting fault-tolerance Thread-Topic: flink snapshotting fault-tolerance Thread-Index: AQHRsfo51gLy1fifLkOFEAHcpbdDpQ== Date: Thu, 19 May 2016 18:48:52 +0000 Message-ID: <50FE65F1-1668-4E68-B07A-010C56333D5B@kth.se> References: <59949853-89C5-489C-8AFA-622B697F5892@kth.se> <8E662FE6-E75F-4676-87AF-442C657B0E05@tetrationanalytics.com> In-Reply-To: Accept-Language: en-US, sv-SE Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ms-exchange-messagesentrepresentingtype: 1 x-ms-exchange-transport-fromentityheader: Hosted x-originating-ip: [85.224.102.36] Content-Type: multipart/alternative; boundary="_000_50FE65F116684E68B07A010C56333D5Bkthse_" MIME-Version: 1.0 archived-at: Thu, 19 May 2016 18:49:02 -0000 --_000_50FE65F116684E68B07A010C56333D5Bkthse_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Sure, in practice you can set a threshold of retries since an operator impl= ementation could cause this indefinitely or any other reason can make snaps= hotting generally infeasible. If I recall correctly that threshold exists i= n the Flink configuration. On 19 May 2016, at 20:42, Stavros Kontopoulos > wrote: The problem here is different though if something is keep failing (permanen= tly) in practice someone needs to be notified. If the user loses snapshotti= ng he must know. On Thu, May 19, 2016 at 9:36 PM, Abhishek R. Singh > wrote: I was wondering how checkpoints can be async? Because your state is constan= tly mutating. You probably need versioned state, or immutable data structs? -Abhishek- On May 19, 2016, at 11:14 AM, Paris Carbone > wrote: Hi Stavros, Currently, rollback failure recovery in Flink works in the pipeline level, = not in the task level (see Millwheel [1]). It further builds on repayable s= tream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pi= peline sources. You can also check this presentation [2] which explains the= basic concepts more in detail I hope. Mind that many upcoming optimisation= opportunities are going to be addressed in the not so long-term Flink road= map. Paris [1] http://static.googleusercontent.com/media/research.google.com/en//pubs/= archive/41378.pdf [2] http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-= tolerance-and-ha On 19 May 2016, at 19:43, Stavros Kontopoulos > wrote: Cool thnx. So if a checkpoint expires the pipeline will block or fail in to= tal or only the specific task related to the operator (running along with t= he checkpoint task) or nothing happens? On Tue, May 17, 2016 at 3:49 PM, Robert Metzger > wrote: Hi Stravos, I haven't implemented our checkpointing mechanism and I didn't participate = in the design decisions while implementing it, so I can not compare it in d= etail to other approaches. From a "does it work perspective": Checkpoints are only confirmed if all pa= rallel subtasks successfully created a valid snapshot of the state. So if t= here is a failure in the checkpointing mechanism, no valid checkpoint will = be created. The system will recover from the last valid checkpoint. There is a timeout for checkpoints. So if a barrier doesn't pass through th= e system for a certain period of time, the checkpoint is cancelled. The def= ault timeout is 10 minutes. Regards, Robert On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos > wrote: Hi, I was looking into the flink snapshotting algorithm details also mentioned = here: http://data-artisans.com/high-throughput-low-latency-and-exactly-once-strea= m-processing-with-apache-flink/ https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-= distributed-dataflows/ http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s= 6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-e= xactly-once-question-td2545.html From other sources i understand that it assumes no failures to work for mes= sage delivery or for example a process hanging for ever: https://en.wikipedia.org/wiki/Snapshot_algorithm https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-globa= l-states-of-distributed-systems/ So my understanding (maybe wrong) is that this is a solution which seems no= t to address the fault tolerance issue in a strong manner like for example = if it was to use a 3pc protocol for local state propagation and global agre= ement. I know the latter is not efficient just mentioning it for comparison= . How the algorithm behaves in practical terms under the presence of its own = failures (this is a background process collecting partial states)? Are ther= e timeouts for reaching a barrier? PS. have not looked deep into the code details yet, planning to. Best, Stavros --_000_50FE65F116684E68B07A010C56333D5Bkthse_ Content-Type: text/html; charset="us-ascii" Content-ID: <06D0E04C86243A4AB078E9676CDAA0A7@ug.kth.se> Content-Transfer-Encoding: quoted-printable Sure, in practice you can set a threshold of retries since an operator impl= ementation could cause this indefinitely or any other reason can make snaps= hotting generally infeasible. If I recall correctly that threshold exists i= n the Flink configuration.

On 19 May 2016, at 20:42, Stavros Kontopoulos <st.kontopoulos@gmail.com> wrote:

The problem here is different though if somethi= ng is keep failing (permanently) in practice someone needs to be notified. = If the user loses snapshotting he must know.

On Thu, May 19, 2016 at 9:36 PM, Abhishek R. Sin= gh <abhishsi@tetrationanalytics.com> wrote:
I was wondering how checkpoi= nts can be async? Because your state is constantly mutating. You probably n= eed versioned state, or immutable data structs?

-Abhishek-

On May 19, 2016, at 11:14 AM, Paris Carbone <parisc@kth.se> wr= ote:

Hi Stavros,

Currently, rollback failure recovery in Flink works in the = pipeline level, not in the task level (see Millwheel [1]). It further build= s on repayable stream logs (i.e. Kafka), thus, there is no need for 3pc or = backup in the pipeline sources. You can also check this presentation [2] which explains the basic concepts mor= e in detail I hope. Mind that many upcoming optimisation opportunities are = going to be addressed in the not so long-term Flink roadmap.

Paris

On 19 May 2016, at 19:43, Stavros Kontopoulos <st.kontop= oulos@gmail.com> wrote:

Cool thnx. So if a checkpoint expires the pipel= ine will block or fail in total or only the specific task related to the op= erator (running along with the checkpoint task) or nothing happens?

On Tue, May 17, 2016 at 3:49 PM, Robert Metzger = <rme= tzger@apache.org> wrote:
Hi Stravos,

I haven't implemented our checkpointing mechanism and I did= n't participate in the design decisions while implementing it, so I can not= compare it in detail to other approaches.

From a "does it work perspective": Checkpoints ar= e only confirmed if all parallel subtasks successfully created a valid snap= shot of the state. So if there is a failure in the checkpointing mechanism,= no valid checkpoint will be created. The system will recover from the last valid checkpoint.
There is a timeout for checkpoints. So if a barrier doesn't= pass through the system for a certain period of time, the checkpoint is ca= ncelled. The default timeout is 10 minutes.

Regards,
Robert


On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopo= ulos <st.kontopoulos@gmail.com> wrote:
From other sources i understand that it assumes no failures= to work for message delivery or for example a process hanging for ever: https://en.wikipedia.org/wiki/Snapshot_algorithm
h= ttps://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global= -states-of-distributed-systems/

So my understanding (maybe wrong) is that this is a solutio= n which seems not to address the fault tolerance issue in a strong manner l= ike for example if it was to use a 3pc protocol for local state propagation= and global agreement. I know the latter is not efficient just mentioning it for comparison.

How the algorithm behaves in practical terms under the pres= ence of its own failures (this is a background process collecting partial s= tates)? Are there timeouts for reaching a barrier?

PS. have not looked deep into the code details yet, plannin= g to.

Best,
Stavros







--_000_50FE65F116684E68B07A010C56333D5Bkthse_--