Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1C629200BB4 for ; Tue, 1 Nov 2016 23:49:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1AFD1160AF7; Tue, 1 Nov 2016 22:49:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10AB8160ADA for ; Tue, 1 Nov 2016 23:49:15 +0100 (CET) Received: (qmail 49885 invoked by uid 500); 1 Nov 2016 22:49:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 49875 invoked by uid 99); 1 Nov 2016 22:49:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2016 22:49:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AE70AC697D for ; Tue, 1 Nov 2016 22:49:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.429 X-Spam-Level: * X-Spam-Status: No, score=1.429 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 8Hfr2wr2BCmi for ; Tue, 1 Nov 2016 22:49:08 +0000 (UTC) Received: from mail-it0-f53.google.com (mail-it0-f53.google.com [209.85.214.53]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id A32895FB6E for ; Tue, 1 Nov 2016 22:49:07 +0000 (UTC) Received: by mail-it0-f53.google.com with SMTP id e187so56153980itc.0 for ; Tue, 01 Nov 2016 15:49:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=pudmQC6RCrHFuRbhoctM7c8mykyAQ4D0EeZMEMqNez8=; b=PIhUASQmrkcKiN5Xaz4n+TmQssViB1YXeSpMgsRPzF5Mb2ze3o8ipweg9r0e5wcooO mRf/lNi+rVC1vANH3qOBo+meTzUIfnchaqMnS1Yfd2Y76d8MK/2MHZc68Tx27M9g0t5S MSDZDO36E8GSvflJ9c7eY832oCQFbvTiYWYB45yxcqJboUVzKNCxkdTRzu1e3TR2rdwg QP2bIuihOYGhml1nUhoNMmAJPcS+KyWYtomBcsfLlYo9FxD2aLJeqpI31NhLHoblrK1u HiYgUxuR+LnIRGvMuHKXuKICMf8GZnq4mQZy4NnDFL05Uw8FcZ5lDYaSSKG2nLkalaQs kxcg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=pudmQC6RCrHFuRbhoctM7c8mykyAQ4D0EeZMEMqNez8=; b=bnYkoKXe+Gh6DGufReD/qNo/eauEJtILdTZ6hhofvacAHeKsEy5kRucCwXwFh2IoQG SiTIWO1YWdiZ0gLKjU4iFnMpRz+1wkovgVqjVLtIC/HK74FbFUJKlm4BVRNDTSJx0+qC SLLTXRymSBQnn33W9zWgWNgZ+M2lIsZ5eTtGc4rZfpX9oxMePEGl/Ze83KQOhQ5qRpHY XM7gnopmBAq2FP1z8ATBWdFaVVS1zk4zuJfg1OvaVx9LMqUvwyx2Ti7XwYPJYgMHPc9C 3qLpy51dPGXau1H+fBuTN8MkgARDNT4wKBYEAkFQtTmdHajroKp7QEIdV8S5GFNB9cJy YHgg== X-Gm-Message-State: ABUngvdOjFE6ZTem7ns5lxf5ymK4kCMZbH3lBIUNGhkyaQN3mDGu3oR14v3NNxo71oF4dqJoSBnohJdpqGW31A== X-Received: by 10.36.239.196 with SMTP id i187mr347473ith.11.1478040546457; Tue, 01 Nov 2016 15:49:06 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Renjie Liu Date: Tue, 01 Nov 2016 22:48:55 +0000 Message-ID: Subject: Re: Question about the checkpoint mechanism in Flink. To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c116e98bc4f8705404521b8 archived-at: Tue, 01 Nov 2016 22:49:17 -0000 --94eb2c116e98bc4f8705404521b8 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, Till: I think the multiple input should include the more general case where redistribution happens between subtasks, right? Since in this case we also need to align check barrier. Till Rohrmann =E4=BA=8E2016=E5=B9=B411=E6=9C=881=E6= =97=A5=E5=91=A8=E4=BA=8C =E4=B8=8B=E5=8D=8811:05=E5=86=99=E9=81=93=EF=BC=9A > The tuples are not buffered until the snapshot is globally complete (a > snapshot is globally complete iff all operators have successfully taken a > snapshot). They are only buffered until the corresponding checkpoint > barrier on the second input is received. Once this is the case, the > checkpoint barrier will directly be send to the downstream operators. Nex= t > a snapshot is taken. Depending on the state backend this can happen > asynchronously or synchronously. After this is done, the operator continu= es > processing elements (for the first input, the buffered elements are > consumed first). > > With multiple inputs I referred to a coFlatMap operator or a join operato= r > which have both two inputs. > > Cheers, > Till > > On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu > wrote: > > Hi, Till: > By operator with multiple inputs, do you mean inputs from multiple > subtasks? > > On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann wrote= : > > Hi Li, > > the statement refers to operators with multiple inputs (two in this case)= . > With the current implementation you will indeed block one of the inputs > after receiving a checkpoint barrier n until you've received the > corresponding checkpoint barrier n on the other input as well. This is wh= at > we call checkpoint barrier alignment. If the processing time on both inpu= t > paths is similar and thus there is no back pressure on any of the inputs, > the alignment should not take too long. In case where one of the inputs i= s > considerably slower than the other, you should an additional delay. > > For single input operators, you don't have to align the checkpoint > barriers. > > The checkpoint barrier alignment is not strictly necessary, but it allows > us to not having to store all in flight records from the second input whi= ch > arrive between the checkpoint barrier on the first input and the > corresponding barrier on the second input. We might change this > implementation in the future, though. > > Cheers, > Till > > On Tue, Nov 1, 2016 at 8:05 AM, Li Wang wrote: > > Hi all, > > I have a question regarding to the state checkpoint mechanism in Flink. I > find the statement "Once the last stream has received barrier n, the > operator emits all pending outgoing records, and then emits > snapshot n barriers itself=E2=80=9D on the document > https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_c= heckpointing.html#exactly-once-vs-at-least-once > . > > Does this mean that to achieve exactly-once semantic, instead of sending > tuples downstream immediately the operator buffers its outgoing tuples in= a > pending queue until the current snapshot is committed? If yes, will this > introduce significant processing delay? > > Thanks, > Li > > > -- > Liu, Renjie > Software Engineer, MVAD > > > -- Liu, Renjie Software Engineer, MVAD --94eb2c116e98bc4f8705404521b8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hi, Till:
I think the multiple input should include the more general case where redis= tribution happens between subtasks, right? Since in this case we also need = to align check barrier.


Till Rohrmann <trohrmann@apache.org>=E4=BA=8E2016= =E5=B9=B411=E6=9C=881=E6=97=A5=E5=91=A8=E4=BA=8C =E4=B8=8B=E5=8D=8811:05=E5= =86=99=E9=81=93=EF=BC=9A
The tuples are not buffered until the snapshot= is globally complete (a snapshot is globally complete iff all operators ha= ve successfully taken a snapshot). They are only buffered until the corresp= onding checkpoint barrier on the second input is received. Once this is the= case, the checkpoint barrier will directly be send to the downstream opera= tors. Next a snapshot is taken. Depending on the state backend this can hap= pen asynchronously or synchronously. After this is done, the operator conti= nues processing elements (for the first input, the buffered elements are co= nsumed first).

With multiple inputs I referred to a coFlatMap operator = or a join operator which have both two inputs.

Cheers,
Till
On Tue, Nov 1, 2= 016 at 3:29 PM, Renjie Liu <= liurenjie2008@gmail.com> wrote:
Hi,= Till:
By operator with multiple inputs, do you mea= n inputs from multiple subtasks?=C2=A0

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <trohrma= nn@apache.org> wrote:
Hi Li,

the statement refers= to operators with multiple inputs (two in this case). With the current imp= lementation you will indeed block one of the inputs after receiving a check= point barrier n until you've received the corresponding checkpoint barr= ier n on the other input as well. This is what we call checkpoint barrier a= lignment. If the processing time on both input paths is similar and thus th= ere is no back pressure on any of the inputs, the alignment should not take= too long. In case where one of the inputs is considerably slower than the = other, you should an additional delay.

For single input ope= rators, you don't have to align the checkpoint barriers.

=

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang = <wangli1426@gmail= .com> wrote:
Hi all,

I have a question regarding to the state checkpoint me= chanism in Flink. I find the statement =C2=A0"Once the last stream has= received barrier=C2=A0n, the operator emits all pending outgoing records, = and then emits snapshot=C2=A0n=C2=A0barriers itself=E2=80=9D on the documen= t = https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_che= ckpointing.html#exactly-once-vs-at-least-once.

Does thi= s mean that to achieve exactly-once semantic, instead of sending tuples dow= nstream immediately the operator buffers its outgoing tuples in a pending q= ueue until the current snapshot is committed? If yes, will this introduce s= ignificant processing delay?

Thanks,
Li

=

--
Liu, Renjie
Software Engineer, MV= AD

--
Liu, Renjie
Software Engineer, MVA= D
--94eb2c116e98bc4f8705404521b8--