Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 67E95200BE6 for ; Mon, 14 Nov 2016 03:00:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 66598160B12; Mon, 14 Nov 2016 02:00:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8A162160AE4 for ; Mon, 14 Nov 2016 03:00:29 +0100 (CET) Received: (qmail 47740 invoked by uid 500); 14 Nov 2016 02:00:23 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 47695 invoked by uid 99); 14 Nov 2016 02:00:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Nov 2016 02:00:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 532A11A092F for ; Mon, 14 Nov 2016 02:00:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.612 X-Spam-Level: ** X-Spam-Status: No, score=2.612 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_TAG_BALANCE_BODY=0.712, SPF_PASS=-0.001, UNPARSEABLE_RELAY=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=alibaba-inc.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 0D0tYFuecJE3 for ; Mon, 14 Nov 2016 02:00:12 +0000 (UTC) Received: from out0-152.mail.aliyun.com (out0-152.mail.aliyun.com [140.205.0.152]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id B1DCB5F249 for ; Mon, 14 Nov 2016 02:00:08 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=alibaba-inc.com; s=default; t=1479088779; h=From:Content-Type:Message-Id:Mime-Version:Subject:Date:To; bh=0ztdgNl6evpKD7tcLyDNUYrsxyAPQ7TNY2kbjLpBAkA=; b=iT0oluqE5oxNiPPgeTcLFjhGp3HXgWMxCB2sFIeq+umF3vAGZKzdbKQkc9kRsEUADRc0JpO+35ApbUYdgD2bEFvSJw1ZXkXJjhv/JVORcUcp5gtu6sXsmiafXvZAHPItja1K9pB8oYl7q0KnyfT0HTARxY7/EWbIaBU2Ft23mdM= X-Alimail-AntiSpam: AC=PASS;BC=-1|-1;BR=01201311R201e4;FP=0|-1|-1|-1|0|-1|-1|-1;HT=e02c03291;MF=xiaogang.sxg@alibaba-inc.com;NM=1;PH=DS;RN=1;SR=0;TI=SMTPD_---.7AUEDz4_1479088770; Received: from 30.30.136.3(mailfrom:xiaogang.sxg@alibaba-inc.com ip:182.92.253.1) by smtp.aliyun-inc.com(127.0.0.1); Mon, 14 Nov 2016 09:59:30 +0800 From: "xiaogang.sxg" Content-Type: multipart/signed; boundary="Apple-Mail=_4462BA5B-8E7D-4F16-B2D5-1AF10562AF2E"; protocol="application/pkcs7-signature"; micalg=sha1 Message-Id: <823EC39D-68F3-4BD5-9592-049B805640FD@alibaba-inc.com> Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) Subject: Re: [DISCUSS] FLIP-14: Loops API and Termination Date: Mon, 14 Nov 2016 09:59:32 +0800 References: <7B36129F-9B0E-40A0-9438-A438A49645D8@kth.se> <3B9CCD75-85F3-4470-8C9C-92F4EA8832B7@kth.se> <64BE04AE-8B40-49BB-B122-01B0FF43D43B@gmail.com> <37C06094-FA7E-4FAB-B1A9-9B009FB7FBF0@kth.se> To: dev@flink.apache.org In-Reply-To: <37C06094-FA7E-4FAB-B1A9-9B009FB7FBF0@kth.se> X-Mailer: Apple Mail (2.3124) archived-at: Mon, 14 Nov 2016 02:00:33 -0000 --Apple-Mail=_4462BA5B-8E7D-4F16-B2D5-1AF10562AF2E Content-Type: multipart/alternative; boundary="Apple-Mail=_AAC719CB-C2BC-44F6-B350-A52F02DE02C6" --Apple-Mail=_AAC719CB-C2BC-44F6-B350-A52F02DE02C6 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=gb2312 Hi Paris Unfortunately, the project is not public yet.=20 But i can provide you a primitive implementation of the update protocol = in the paper. It=A1=AFs implemented in Storm. Since the protocol assumes = the communication channels between different tasks are dual, i think = it=A1=AFs not easy to adapt it to Flink.=20 Regards Xiaogang > =D4=DA 2016=C4=EA11=D4=C212=C8=D5=A3=AC=C9=CF=CE=E73:03=A3=ACParis = Carbone =D0=B4=B5=C0=A3=BA >=20 > Hi Shi, >=20 > Naiad/Timely Dataflow and other projects use global coordination which = is very convenient for asynchronous progress tracking in general but it = has some downsides in a production systems that count on in-flight = transactional control mechanisms and rollback recovery guarantees. This = is why we generally prefer decentralized approaches (despite their our = downsides). >=20 > Regarding synchronous/structured iterations, this is a bit off topic = and they are a bit of a different story as you already know. > We maintain a graph streaming (gelly-streams) library on Flink that = you might find interesting [1]. Vasia, another Flink committer is also = working on that among others. > You can keep an eye on it since we are planning to use this project as = a showcase for a new way of doing structured and fixpoint iterations on = streams in the future. >=20 > P.S. many thanks for sharing your publication, it was an interesting = read. Do you happen to have your source code public? We could most = certainly use it in an benchmark soon. >=20 > [1] https://github.com/vasia/gelly-streaming = >=20 >=20 > On 11 Nov 2016, at 19:18, SHI Xiaogang >> wrote: >=20 > Hi, Fouad >=20 > Thank you for the explanation. Now the centralized method seems = correct to > me. > The passing of StatusUpdate events will lead to synchronous iterations = and > we are using the information in each iterations to terminate the > computation. >=20 > Actually, i prefer the centralized method because in many = applications, the > convergence may depend on some global statistics. > For example, a PageRank program may terminate the computation when 99% > vertices are converged. > I think those learning programs which cannot reach the fixed-point > (oscillating around the fixed-point) can benefit a lot from such = features. > The decentralized method makes it hard to support such convergence > conditions. >=20 >=20 > Another concern is that Flink cannot produce periodical results in the > iteration over infinite data streams. > Take a concrete example. Given an edge stream constructing a graph, = the > user may need the PageRank weight of each vertex in the graphs formed = at > certain instants. > Currently Flink does not provide any input or iteration information to > users, making users hard to implement such real-time iterative = applications. > Such features are supported in both Naiad and Tornado. I think Flink = should > support it as well. >=20 > What do you think? >=20 > Regards > Xiaogang >=20 >=20 > 2016-11-11 19:27 GMT+08:00 Fouad ALi >>: >=20 > Hi Shi, >=20 > It seems that you are referring to the centralized algorithm which is = no > longer the proposed version. > In the decentralized version (check last doc) there is no master node = or > global coordination involved. >=20 > Let us keep this discussion to the decentralized one if possible. >=20 > To answer your points on the previous approach, there is a catch in = your > trace at t7. Here is what is happening : > - Head,as well as RS, will receive a 'BroadcastStatusUpdate' from > runtime (see 2.1 in the steps). > - RS and Heads will broadcast StatusUpdate event and will not notify = its > status. > - When StatusUpdate event gets back to the head it will notify its > WORKING status. >=20 > Hope that answers your concern. >=20 > Best, > Fouad >=20 > On Nov 11, 2016, at 6:21 AM, SHI Xiaogang >> > wrote: >=20 > Hi Paris >=20 > I have several concerns about the correctness of the termination > protocol. > I think the termination protocol put an end to the computation even = when > the computation has not converged. >=20 > Suppose there exists a loop context constructed by a OP operator, a = Head > operator and a Tail operator (illustrated in Figure 2 in the first > draft). > The stream only contains one record. OP will pass the record to its > downstream operators 10 times. In other words, the loop should iterate = 10 > times. >=20 > If I understood the protocol correctly, the following event sequence = may > happen in the computation: > t1: RS emits Record to OP. Since RS has reached the "end-of-stream", = the > system enters into Speculative Phase. > t2: OP receives Record and emits it to TAIL. > t3: HEAD receives the UpdateStatus event, and notifies with an IDLE > state. > t4. OP receives the UpdateStatus event from HEAD, and notifies with an > WORKING state. > t5. TAIL receives Record and emits it to HEAD. > t6. TAIL receives the UpdateStatus event from OP, and notifies with an > WORKING state. > t7. The system starts a new attempt. HEAD receives the UpdateStatus = event > and notifies with an IDLE state. (Record is still in transition.) > t8. OP receives the UpdateStatus event from HEAD and notifies with an > IDLE > state. > t9. TAIL receives the UpdateStatus event from OP and notifies with an > IDLE > state. > t10. HEAD receives Record from TAIL and emits it to OP. > t11. System puts an end to the computation. >=20 > Though the computation is expected to iterate 10 times, it ends = earlier. > The cause is that the communication channels of MASTER=3D>HEAD and > TAIL=3D>HEAD > are not synchronized. >=20 > I think the protocol follows the idea of the Chandy-Lamport algorithm = to > determine a global state. > But the information of whether a node has processed any record to = since > the > last request is not STABLE. > Hence i doubt the correctness of the protocol. >=20 > To determine the termination correctly, we need some information that = is > stable. > In timelyflow, Naiad collects the progress made in each iteration and > terminates the loop when a little progress is made in an iteration > (identified by the timestamp vector). > The information is stable because the result of an iteration cannot be > changed by the execution of later iterations. >=20 > A similar method is also adopted in Tornado. > You may see my paper for more details about the termination of loops: > http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf = < > http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf = > >=20 > Regards > Xiaogang >=20 > 2016-11-11 3:19 GMT+08:00 Paris Carbone > = parisc@kth.se >>>: >=20 > Hi again Flink folks, >=20 > Here is our new proposal that addresses Job Termination - the loop = fault > tolerance proposal will follow shortly. > As Stephan hinted, we need operators to be aware of their scope level. >=20 > Thus, it is time we make loops great again! :) >=20 > Part of this FLIP basically introduces a new functional, compositional > API > for defining asynchronous loops for DataStreams. > This is coupled with a decentralized algorithm for job termination = with > loops - along the lines of what Stephan described. > We are already working on the actual prototypes as you can observe in > the > links of the doc. >=20 > Please let us know if you like (or don't like) it and why, in this = mail > discussion. >=20 > https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y- = > PfTHtq3173EhsAkpBoQ >=20 > cheers > Paris and Fouad >=20 > On 31 Oct 2016, at 12:53, Paris Carbone parisc@kth.se >> kth.se > = >>> wrote: >=20 > Hey Stephan, >=20 > Thanks for looking into it! >=20 > +1 for breaking this up, will do that. >=20 > I can see your point and maybe it makes sense to introduce part of > scoping > to incorporate support for nested loops (otherwise it can=A1=AFt = work). > Let us think about this a bit. We will share another draft for a more > detail description of the approach you are suggesting asap. >=20 >=20 > On 27 Oct 2016, at 10:55, Stephan Ewen > sewen@apache.org >> @apache.org >> wrote: >=20 > How about we break this up into two FLIPs? There are after all two > orthogonal problems (termination, fault tolerance) with quite = different > discussion states. >=20 > Concerning fault tolerance, I like the ideas. > For the termination proposal, I would like to iterate a bit more. >=20 > *Termination algorithm:* >=20 > My main concern here is the introduction of a termination coordinator > and > any involvement of RPC messages when deciding termination. > That would be such a fundamental break with the current runtime > architecture, and it would make the currently very elegant and simple > model > much more complicated and harder to maintain. Given that Flink's > runtime is > complex enough, I would really like to avoid that. >=20 > The current runtime paradigm coordinates between operators strictly = via > in-band events. RPC calls happen between operators and the master for > triggering and acknowledging execution and checkpoints. >=20 > I was wondering whether we can keep following that paradigm and still > get > most of what you are proposing here. In some sense, all we need to do = is > replace RPC calls with in-band events, and "decentralize" the > coordinator > such that every operator can make its own termination decision by > itself. >=20 > This is only a rough sketch, you probably need to flesh it out more. >=20 > - I assume that the OP in the diagram knows that it is in a loop and > that > it is the one connected to the head and tail >=20 > - When OP receives and EndOfStream Event from the regular source (RS), > it > emits an "AttemptTermination" event downstream to the operators > involved in > the loop. It attaches an attempt sequence number and memorizes that > - Tail and Head forward these events > - When OP receives the event back with the same attempt sequence = number, > and no records came in the meantime, it shuts down and emits = EndOfStream > downstream > - When other records came back between emitting the AttemptTermination > event and receiving it back, then it emits a new AttemptTermination > event > with the next sequence number. > - This should terminate as soon as the loop is empty. >=20 > Might this model even generalize to nested loops, where the > "AttemptTermination" event is scoped by the loop's nesting level? >=20 > Let me know what you think! >=20 >=20 > Best, > Stephan >=20 >=20 > On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen > > > sewen@apache.org > >>> wrote: >=20 > Hi! >=20 > I am still scanning it and compiling some comments. Give me a bit ;-) >=20 > Stephan >=20 >=20 > On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone > = parisc@kth.se >> parisc@kth.se > >>> = wrote: >=20 > Hey all, >=20 > Now that many of you have already scanned the document (judging from = the > views) maybe it is time to give back some feedback! > Did you like it? Would you suggest an improvement? >=20 > I would suggest not to leave this in the void. It has to do with > important properties that the system promises to provide. > Me and Fouad will do our best to answer your questions and discuss = this > further. >=20 > cheers > Paris >=20 > On 21 Oct 2016, at 08:54, Paris Carbone > = parisc@kth.se >> kth.se > = >> th.se > > >>>> wrote: >=20 > Hello everyone, >=20 > Loops in Apache Flink have a good potential to become a much more > powerful thing in future version of Apache Flink. > There is generally high demand to make them usable and first of all > production-ready for upcoming releases. >=20 > As a first commitment we would like to propose FLIP-13 for consistent > processing with Loops. > We are also working on scoped loops for Q1 2017 which we can share if > there is enough interest. >=20 > For now, that is an improvement proposal that solves two pending major > issues: >=20 > 1) The (not so trivial) problem of correct termination of jobs with > iterations > 2) The applicability of the checkpointing algorithm to iterative > dataflow > graphs. >=20 > We would really appreciate it if you go through the linked draft > (motivation and proposed changes) for FLIP-13 and point out comments, > preferably publicly in this devlist discussion before we go ahead and > update the wiki. >=20 > https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0 > BhDbtoYucmByBjRBISs/edit?usp=3Dsharing >=20 > cheers >=20 > Paris and Fouad --Apple-Mail=_AAC719CB-C2BC-44F6-B350-A52F02DE02C6 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=gb2312
Hi Paris

Unfortunately, the project is not = public yet. 
But i can provide you a primitive = implementation of the update protocol in the paper. It=A1=AFs = implemented in Storm. Since the protocol assumes the communication = channels between different tasks are dual, i think it=A1=AFs not easy to = adapt it to Flink. 

Regards
Xiaogang


=D4=DA = 2016=C4=EA11=D4=C212=C8=D5=A3=AC=C9=CF=CE=E73:03=A3=ACParis Carbone = <parisc@kth.se> = =D0=B4=B5=C0=A3=BA

Hi Shi,

Naiad/Timely Dataflow and other projects use = global coordination which is very convenient for asynchronous progress = tracking in general but it has some downsides in a production systems = that count on in-flight transactional control mechanisms and rollback = recovery guarantees. This is why we generally prefer decentralized = approaches (despite their our downsides).

Regarding synchronous/structured iterations, = this is a bit off topic and they are a bit of a different story as you = already know.
We maintain a graph streaming (gelly-streams) = library on Flink that you might find interesting [1]. Vasia, another = Flink committer is also working on that among others.
You can keep an eye = on it since we are planning to use this project as a showcase for a new = way of doing structured and fixpoint iterations on streams in the = future.

P.S. many thanks = for sharing your publication, it was an interesting read. Do you happen = to have your source code public? We could most certainly use it in an = benchmark soon.

[1] https://github.com/vasia/gelly-streaming


On 11 Nov 2016, at 19:18, SHI Xiaogang = <shixiaogangg@gmail.com<mailto:shixiaogangg@gmail.com>> = wrote:

Hi, Fouad

Thank you for the explanation. Now the = centralized method seems correct to
me.
The passing of StatusUpdate events will = lead to synchronous iterations and
we are using the information in each = iterations to terminate the
computation.

Actually, i prefer the centralized method = because in many applications, the
convergence may depend on some global = statistics.
For example, a PageRank program may terminate = the computation when 99%
vertices are converged.
I think those = learning programs which cannot reach the fixed-point
(oscillating around = the fixed-point) can benefit a lot from such features.
The decentralized = method makes it hard to support such convergence
conditions.

Another concern is that Flink cannot produce = periodical results in the
iteration over infinite data = streams.
Take a concrete example. Given an edge stream = constructing a graph, the
user may need the PageRank weight of each = vertex in the graphs formed at
certain instants.
Currently Flink = does not provide any input or iteration information to
users, making users = hard to implement such real-time iterative applications.
Such features are = supported in both Naiad and Tornado. I think Flink should
support it as = well.

What do you = think?

Regards
Xiaogang


2016-11-11 19:27 GMT+08:00 Fouad ALi = <fouad.alsayadi@gmail.com<mailto:fouad.alsayadi@gmail.com>>:

Hi Shi,

It seems that you are referring to the = centralized algorithm which is no
longer the proposed version.
In the = decentralized version (check last doc) there is no master node = or
global coordination involved.

Let us keep this discussion to the = decentralized one if possible.

To answer your points on the previous approach, = there is a catch in your
trace at t7. Here is what is happening = :
- Head,as well as RS, will receive  a = 'BroadcastStatusUpdate' from
runtime (see 2.1 in the steps).
- RS and Heads will = broadcast StatusUpdate  event and will not notify its
status.
- When StatusUpdate = event gets back to the head it will notify its
WORKING =  status.

Hope that answers = your concern.

Best,
Fouad

On Nov 11, 2016, at 6:21 AM, SHI Xiaogang = <shixiaogangg@gmail.com<mailto:shixiaogangg@gmail.com>>
wrote:

Hi Paris

I have several concerns about the correctness of = the termination
protocol.
I think the termination protocol put an = end to the computation even when
the computation has not = converged.

Suppose there = exists a loop context constructed by a OP operator, a Head
operator and a Tail = operator (illustrated in Figure 2 in the first
draft).
The stream only = contains one record. OP will pass the record to its
downstream = operators 10 times. In other words, the loop should iterate 10
times.

If I understood the protocol correctly, = the following event sequence may
happen in the computation:
t1:  RS emits = Record to OP. Since RS has reached the "end-of-stream", the
system enters into = Speculative Phase.
t2:  OP receives Record and emits it to = TAIL.
t3:  HEAD receives the UpdateStatus event, = and notifies with an IDLE
state.
t4. OP receives the UpdateStatus event = from HEAD, and notifies with an
WORKING state.
t5. TAIL receives = Record and emits it to HEAD.
t6. TAIL receives the UpdateStatus event = from OP, and notifies with an
WORKING state.
t7. The system = starts a new attempt. HEAD receives the UpdateStatus event
and notifies with = an IDLE state.  (Record is still in transition.)
t8. OP receives the = UpdateStatus event from HEAD and notifies with an
IDLE
state.
t9. TAIL receives = the UpdateStatus event from OP and notifies with an
IDLE
state.
t10. HEAD receives = Record from TAIL and emits it to OP.
t11. System puts an end to the = computation.

Though the = computation is expected to iterate 10 times, it ends earlier.
The cause is that = the communication channels of MASTER=3D>HEAD and
TAIL=3D>HEAD
are not synchronized.

I think the protocol follows the idea of = the Chandy-Lamport algorithm to
determine a global state.
But the information = of whether a node has processed any record to since
the
last request is not = STABLE.
Hence i doubt the correctness of the = protocol.

To determine the = termination correctly, we need some information that is
stable.
In timelyflow, = Naiad collects the progress made in each iteration and
terminates the loop = when a little progress is made in an iteration
(identified by the = timestamp vector).
The information is stable because the result of = an iteration cannot be
changed by the execution of later = iterations.

A similar method is = also adopted in Tornado.
You may see my paper for more details = about the termination of loops:
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <
http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>

Regards
Xiaogang

2016-11-11 3:19 GMT+08:00 Paris Carbone = <parisc@kth.se<mailto:parisc@kth.se> = <mailto:
parisc@kth.se<mailto:parisc@kth.se>>>:

Hi again Flink folks,

Here is our new proposal that addresses = Job Termination - the loop fault
tolerance proposal will follow = shortly.
As Stephan hinted, we need operators to be aware = of their scope level.

Thus, it is time we make loops great again! = :)

Part of this FLIP = basically introduces a new functional, compositional
API
for defining = asynchronous loops for DataStreams.
This is coupled with a decentralized = algorithm for job termination with
loops - along the lines of what Stephan = described.
We are already working on the actual prototypes = as you can observe in
the
links of the doc.

Please let us know if you like (or don't = like) it and why, in this mail
discussion.

https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y-
PfTHtq3173EhsAkpBoQ

cheers
Paris and Fouad

On 31 Oct 2016, at 12:53, Paris Carbone = <
parisc@kth.se <mailto:
parisc@kth.se<mailto:parisc@kth.se>><mailto:parisc@
kth.se<http://kth.se> <http://kth.se/>>> = wrote:

Hey = Stephan,

Thanks for looking = into it!

+1 for breaking = this up, will do that.

I can see your point and maybe it makes sense to = introduce part of
scoping
to incorporate support for nested loops = (otherwise it can=A1=AFt work).
Let us think about this a bit. We will = share another draft for a more
detail description of the approach you = are suggesting asap.


On 27 Oct 2016, at = 10:55, Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org> = <mailto:
sewen@apache.org<mailto:sewen@apache.org>><mailto:sewen
@apache.org>> wrote:

How about we break this up into two FLIPs? There = are after all two
orthogonal problems (termination, fault = tolerance) with quite different
discussion states.

Concerning fault tolerance, I like the = ideas.
For the termination proposal, I would like to = iterate a bit more.

*Termination algorithm:*

My main concern here is the introduction = of a termination coordinator
and
any involvement of RPC messages when = deciding termination.
That would be such a fundamental break = with the current runtime
architecture, and it would make the = currently very elegant and simple
model
much more complicated and harder to = maintain. Given that Flink's
runtime is
complex enough, I would really like to = avoid that.

The current runtime = paradigm coordinates between operators strictly via
in-band events. RPC = calls happen between operators and the master for
triggering and = acknowledging execution and checkpoints.

I was wondering whether we can keep following = that paradigm and still
get
most of what you are proposing here. In = some sense, all we need to do is
replace RPC calls with in-band events, = and "decentralize" the
coordinator
such that every operator can make its own = termination decision by
itself.

This is only a rough sketch, you probably need = to flesh it out more.

- I assume that the OP in the diagram knows that = it is in a loop and
that
it is the one connected to the head and = tail

- When OP receives = and EndOfStream Event from the regular source (RS),
it
emits an = "AttemptTermination" event downstream to the operators
involved = in
the loop. It attaches an attempt sequence number = and memorizes that
- Tail and Head forward these events
- When OP receives = the event back with the same attempt sequence number,
and no records came = in the meantime, it shuts down and emits EndOfStream
downstream
- When other = records came back between emitting the AttemptTermination
event and receiving = it back, then it emits a new AttemptTermination
event
with the next = sequence number.
- This should terminate as soon as the loop is = empty.

Might this model = even generalize to nested loops, where the
"AttemptTermination" event is scoped by = the loop's nesting level?

Let me know what you think!


Best,
Stephan


On Thu, Oct 27, = 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org>
<mailto:sewen@apache.org><mailto:
sewen@apache.org<mailto:sewen@apache.org> <mailto:sewen@apache.org>>> = wrote:

Hi!

I am still scanning it and compiling some = comments. Give me a bit ;-)

Stephan


On Wed, Oct 26, = 2016 at 6:13 PM, Paris Carbone <parisc@kth.se<mailto:parisc@kth.se> <mailto:
parisc@kth.se<mailto:parisc@kth.se>><mailto:
parisc@kth.se<mailto:parisc@kth.se> <mailto:parisc@kth.se>>> = wrote:

Hey all,

Now that many of you have already scanned = the document (judging from the
views) maybe it is time to give back some = feedback!
Did you like it? Would you suggest an = improvement?

I would suggest not = to leave this in the void. It has to do with
important = properties that the system promises to provide.
Me and Fouad will = do our best to answer your questions and discuss this
further.

cheers
Paris

On 21 Oct 2016, at 08:54, Paris Carbone = <parisc@kth.se<mailto:parisc@kth.se> = <mailto:
parisc@kth.se<mailto:parisc@kth.se>><mailto:parisc@
kth.se<http://kth.se> <http://kth.se/>><mailto:parisc@k
th.se<http://th.se> <http://th.se/><http://th.se <http://th.se/>>>> = wrote:

Hello = everyone,

Loops in Apache = Flink have a good potential to become a much more
powerful thing in = future version of Apache Flink.
There is generally high demand to make = them usable and first of all
production-ready for upcoming = releases.

As a first = commitment we would like to propose FLIP-13 for consistent
processing with = Loops.
We are also working on scoped loops for Q1 2017 = which we can share if
there is enough interest.

For now, that is an improvement proposal = that solves two pending major
issues:

1) The (not so trivial) problem of correct = termination of jobs with
iterations
2) The applicability of the checkpointing = algorithm to iterative
dataflow
graphs.

We would really appreciate it if you go through = the linked draft
(motivation and proposed changes) for FLIP-13 = and point out comments,
preferably publicly in this devlist = discussion before we go ahead and
update the wiki.

https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
BhDbtoYucmByBjRBISs/edit?usp=3Dsharing

cheers

Paris and = Fouad

= --Apple-Mail=_AAC719CB-C2BC-44F6-B350-A52F02DE02C6-- --Apple-Mail=_4462BA5B-8E7D-4F16-B2D5-1AF10562AF2E Content-Disposition: attachment; filename=smime.p7s Content-Type: application/pkcs7-signature; name=smime.p7s Content-Transfer-Encoding: base64 MIAGCSqGSIb3DQEHAqCAMIACAQExCzAJBgUrDgMCGgUAMIAGCSqGSIb3DQEHAQAAoIIG7zCCAzow ggKjoAMCAQICAQMwDQYJKoZIhvcNAQEFBQAwgZAxJjAkBgkqhkiG9w0BCQEWF2FsaWxhbmdAYWxp YmFiYS1pbmMuY29tMRMwEQYDVQQDDApBbGlsYW5nIENBMQwwCgYDVQQLDANJbmMxEDAOBgNVBAoM B0FsaWJhYmExETAPBgNVBAcMCEhhbmdaaG91MREwDwYDVQQIDAhaaGVKaWFuZzELMAkGA1UEBhMC Q04wHhcNMTQwMzIxMTgyNjQ4WhcNMzQwMzE2MTgyNjQ4WjA/MR0wGwYDVQQDDBRBbGlsYW5nIENs YXNzIDMgUm9vdDEMMAoGA1UECwwDSW5jMRAwDgYDVQQKDAdBbGliYWJhMIGfMA0GCSqGSIb3DQEB AQUAA4GNADCBiQKBgQDkSZUxhsryrulw7ShMNYBuF61+lkYT1/ydmAXyMyMXhSGaKJo6onqdui8W 3X8TZ44E5ZMEMM3rTb0o3+t6N2G9qstbpfm/wnxlNsdFKBZAQpsTp7YKDaVGo1iMir6NE5VXtlZc ZlfdXLncAGxGIDITf6+7nvWWU2eQCAdeGH4+iwIDAQABo4HzMIHwMB0GA1UdDgQWBBS8GDnn09jS 1FKSW9ID4Szifw9bNTCBvQYDVR0jBIG1MIGygBRsQqcG9DDm61l1oc2mKWiF5GLUraGBlqSBkzCB kDEmMCQGCSqGSIb3DQEJARYXYWxpbGFuZ0BhbGliYWJhLWluYy5jb20xEzARBgNVBAMMCkFsaWxh bmcgQ0ExDDAKBgNVBAsMA0luYzEQMA4GA1UECgwHQWxpYmFiYTERMA8GA1UEBwwISGFuZ1pob3Ux ETAPBgNVBAgMCFpoZUppYW5nMQswCQYDVQQGEwJDToIBATAPBgNVHRMBAf8EBTADAQH/MA0GCSqG SIb3DQEBBQUAA4GBAAfkwgwem9gawUb7wU4oiGw2aCw6QbKgfSBv+Dj6RiA4CJztW0IF6g6cRM85 Uv6+AoGyC14x871UJBHNt75ZBgSyvZpXu2n3BLQmTe3gQNXsipOvCSlM2V98U/KVKRgUgzb6dNDd jMzv9yEIGuar9zbk5GRDDSQR+qTM+a8i2umFMIIDrTCCAxagAwIBAgIGAVXYnYEKMA0GCSqGSIb3 DQEBBQUAMD8xHTAbBgNVBAMMFEFsaWxhbmcgQ2xhc3MgMyBSb290MQwwCgYDVQQLDANJbmMxEDAO BgNVBAoMB0FsaWJhYmEwHhcNMTYwNzEwMDYyMDQ4WhcNMjEwNzA5MDYyMDQ4WjCB3TErMCkGCSqG SIb3DQEJARYceGlhb2dhbmcuc3hnQGFsaWJhYmEtaW5jLmNvbTEPMA0GA1UEAwwGMTA4NzI3MUow SAYDVQQMDEExMDg3Mjd8TURNU0VSVkVSOEI4MEI0NjEtRjQ5My01ODM4LTk4MjYtRDdFMEIyQTBF OUU5fGY0NWM4OWIwMGM4OTEMMAoGA1UECwwDSW5jMRAwDgYDVQQKDAdBbGliYWJhMREwDwYDVQQH DAhIYW5nWmhvdTERMA8GA1UECAwIWmhlSmlhbmcxCzAJBgNVBAYTAkNOMIGfMA0GCSqGSIb3DQEB AQUAA4GNADCBiQKBgQCeqOw8s9Pln59GfS5qUI2hRwtYzdPwlcyHA4xsKfjJUbDkL8LV2y4PsgaT QR4BepkYa9cp+8fR36piTmwxl6MHXhh2WnnLHZT8V4dvPB74EeZTz1PAKlaiooLZfBS6C5Uj9kwI 9Yd2Eh+J82kPIK4tWNmhGc4eybsscslLEpTF5QIDAQABo4IBEzCCAQ8wGAYGKQEBAg0eBA4EDGY0 NWM4OWIwMGM4OTAPBgYpAQECBB4EBQQDb3N4MDkGBikBAQIMHgQvBC1NRE1TRVJWRVI4QjgwQjQ2 MS1GNDkzLTU4MzgtOTgyNi1EN0UwQjJBMEU5RTkwEgYGKQEBAgseBAgEBjEwODcyNzBFBgNVHSUE PjA8BggrBgEFBQcDAQYIKwYBBQUHAwIGCCsGAQUFBwMEBggrBgEFBQcDDQYIKwYBBQUHAw4GCCsG AQUFBwMRMAwGA1UdDwQFAwMH/4AwHQYDVR0OBBYEFA1uReDOTxDW8QLiZyNALGgAr4eXMB8GA1Ud IwQYMBaAFLwYOefT2NLUUpJb0gPhLOJ/D1s1MA0GCSqGSIb3DQEBBQUAA4GBAAB4GoRVMs8bZH9I Ju6pl9/H/6oY+hF0mtNiSJvXYPLzhjZ0y7JUhZWj2hD3kWFVPRNzJsvHQJMZfUpJvTFvJY7JKN49 vSLGQB1Mvk5PCEj+URwpr+fPw+ro69mO9fRrCJTZOsAhO+/5rP52PluAv3ifj7KNqGZ1F8OtP4fh NUXmMYICBjCCAgICAQEwSTA/MR0wGwYDVQQDDBRBbGlsYW5nIENsYXNzIDMgUm9vdDEMMAoGA1UE CwwDSW5jMRAwDgYDVQQKDAdBbGliYWJhAgYBVdidgQowCQYFKw4DAhoFAKCCARMwGAYJKoZIhvcN AQkDMQsGCSqGSIb3DQEHATAcBgkqhkiG9w0BCQUxDxcNMTYxMTE0MDE1OTMzWjAjBgkqhkiG9w0B CQQxFgQU74PJVoMMxowavOBf3/V0KZu8TawwWAYJKwYBBAGCNxAEMUswSTA/MR0wGwYDVQQDDBRB bGlsYW5nIENsYXNzIDMgUm9vdDEMMAoGA1UECwwDSW5jMRAwDgYDVQQKDAdBbGliYWJhAgYBVdid gQowWgYLKoZIhvcNAQkQAgsxS6BJMD8xHTAbBgNVBAMMFEFsaWxhbmcgQ2xhc3MgMyBSb290MQww CgYDVQQLDANJbmMxEDAOBgNVBAoMB0FsaWJhYmECBgFV2J2BCjANBgkqhkiG9w0BAQEFAASBgAQ8 PbZoOBFTAPZ7o4a7tNUQbkdvIE3VZwlKQd7xZZyO4m5qiwiqkrtPZdbvR7kFKBuMBq9ZKEqhGfia kFsqPSsj4sBp5QZg+RCJWV/h86Qmxe04jPwg7F9U9vawPUKbFUoQVQE9CTkcbm547u3rm8i8JnDf 8CELbNJAXHEfA4FgAAAAAAAA --Apple-Mail=_4462BA5B-8E7D-4F16-B2D5-1AF10562AF2E--