flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "xiaogang.sxg" <xiaogang....@alibaba-inc.com>
Subject Re: [DISCUSS] FLIP-14: Loops API and Termination
Date Mon, 14 Nov 2016 01:59:32 GMT
Hi Paris

Unfortunately, the project is not public yet. 
But i can provide you a primitive implementation of the update protocol in the paper. It’s
implemented in Storm. Since the protocol assumes the communication channels between different
tasks are dual, i think it’s not easy to adapt it to Flink. 

Regards
Xiaogang


> 在 2016年11月12日,上午3:03,Paris Carbone <parisc@kth.se> 写道:
> 
> Hi Shi,
> 
> Naiad/Timely Dataflow and other projects use global coordination which is very convenient
for asynchronous progress tracking in general but it has some downsides in a production systems
that count on in-flight transactional control mechanisms and rollback recovery guarantees.
This is why we generally prefer decentralized approaches (despite their our downsides).
> 
> Regarding synchronous/structured iterations, this is a bit off topic and they are a bit
of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that you might find interesting
[1]. Vasia, another Flink committer is also working on that among others.
> You can keep an eye on it since we are planning to use this project as a showcase for
a new way of doing structured and fixpoint iterations on streams in the future.
> 
> P.S. many thanks for sharing your publication, it was an interesting read. Do you happen
to have your source code public? We could most certainly use it in an benchmark soon.
> 
> [1] https://github.com/vasia/gelly-streaming <https://github.com/vasia/gelly-streaming>
> 
> 
> On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaogangg@gmail.com <mailto:shixiaogangg@gmail.com><mailto:shixiaogangg@gmail.com
<mailto:shixiaogangg@gmail.com>>> wrote:
> 
> Hi, Fouad
> 
> Thank you for the explanation. Now the centralized method seems correct to
> me.
> The passing of StatusUpdate events will lead to synchronous iterations and
> we are using the information in each iterations to terminate the
> computation.
> 
> Actually, i prefer the centralized method because in many applications, the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such features.
> The decentralized method makes it hard to support such convergence
> conditions.
> 
> 
> Another concern is that Flink cannot produce periodical results in the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph, the
> user may need the PageRank weight of each vertex in the graphs formed at
> certain instants.
> Currently Flink does not provide any input or iteration information to
> users, making users hard to implement such real-time iterative applications.
> Such features are supported in both Naiad and Tornado. I think Flink should
> support it as well.
> 
> What do you think?
> 
> Regards
> Xiaogang
> 
> 
> 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsayadi@gmail.com <mailto:fouad.alsayadi@gmail.com><mailto:fouad.alsayadi@gmail.com
<mailto:fouad.alsayadi@gmail.com>>>:
> 
> Hi Shi,
> 
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
> 
> Let us keep this discussion to the decentralized one if possible.
> 
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
> 
> Hope that answers your concern.
> 
> Best,
> Fouad
> 
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaogangg@gmail.com <mailto:shixiaogangg@gmail.com><mailto:shixiaogangg@gmail.com
<mailto:shixiaogangg@gmail.com>>>
> wrote:
> 
> Hi Paris
> 
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
> 
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its
> downstream operators 10 times. In other words, the loop should iterate 10
> times.
> 
> If I understood the protocol correctly, the following event sequence may
> happen in the computation:
> t1:  RS emits Record to OP. Since RS has reached the "end-of-stream", the
> system enters into Speculative Phase.
> t2:  OP receives Record and emits it to TAIL.
> t3:  HEAD receives the UpdateStatus event, and notifies with an IDLE
> state.
> t4. OP receives the UpdateStatus event from HEAD, and notifies with an
> WORKING state.
> t5. TAIL receives Record and emits it to HEAD.
> t6. TAIL receives the UpdateStatus event from OP, and notifies with an
> WORKING state.
> t7. The system starts a new attempt. HEAD receives the UpdateStatus event
> and notifies with an IDLE state.  (Record is still in transition.)
> t8. OP receives the UpdateStatus event from HEAD and notifies with an
> IDLE
> state.
> t9. TAIL receives the UpdateStatus event from OP and notifies with an
> IDLE
> state.
> t10. HEAD receives Record from TAIL and emits it to OP.
> t11. System puts an end to the computation.
> 
> Though the computation is expected to iterate 10 times, it ends earlier.
> The cause is that the communication channels of MASTER=>HEAD and
> TAIL=>HEAD
> are not synchronized.
> 
> I think the protocol follows the idea of the Chandy-Lamport algorithm to
> determine a global state.
> But the information of whether a node has processed any record to since
> the
> last request is not STABLE.
> Hence i doubt the correctness of the protocol.
> 
> To determine the termination correctly, we need some information that is
> stable.
> In timelyflow, Naiad collects the progress made in each iteration and
> terminates the loop when a little progress is made in an iteration
> (identified by the timestamp vector).
> The information is stable because the result of an iteration cannot be
> changed by the execution of later iterations.
> 
> A similar method is also adopted in Tornado.
> You may see my paper for more details about the termination of loops:
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>
<
> http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf <http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf>>
> 
> Regards
> Xiaogang
> 
> 2016-11-11 3:19 GMT+08:00 Paris Carbone <parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se
<mailto:parisc@kth.se>> <mailto:
> parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se <mailto:parisc@kth.se>>>>:
> 
> Hi again Flink folks,
> 
> Here is our new proposal that addresses Job Termination - the loop fault
> tolerance proposal will follow shortly.
> As Stephan hinted, we need operators to be aware of their scope level.
> 
> Thus, it is time we make loops great again! :)
> 
> Part of this FLIP basically introduces a new functional, compositional
> API
> for defining asynchronous loops for DataStreams.
> This is coupled with a decentralized algorithm for job termination with
> loops - along the lines of what Stephan described.
> We are already working on the actual prototypes as you can observe in
> the
> links of the doc.
> 
> Please let us know if you like (or don't like) it and why, in this mail
> discussion.
> 
> https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y- <https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y->
> PfTHtq3173EhsAkpBoQ
> 
> cheers
> Paris and Fouad
> 
> On 31 Oct 2016, at 12:53, Paris Carbone <parisc@kth.se <mailto:parisc@kth.se>
<mailto:
> parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se <mailto:parisc@kth.se>>><mailto:parisc@
> kth.se <http://kth.se/><http://kth.se <http://kth.se/>> <http://kth.se/
<http://kth.se/>>>> wrote:
> 
> Hey Stephan,
> 
> Thanks for looking into it!
> 
> +1 for breaking this up, will do that.
> 
> I can see your point and maybe it makes sense to introduce part of
> scoping
> to incorporate support for nested loops (otherwise it can’t work).
> Let us think about this a bit. We will share another draft for a more
> detail description of the approach you are suggesting asap.
> 
> 
> On 27 Oct 2016, at 10:55, Stephan Ewen <sewen@apache.org <mailto:sewen@apache.org><mailto:sewen@apache.org
<mailto:sewen@apache.org>> <mailto:
> sewen@apache.org <mailto:sewen@apache.org><mailto:sewen@apache.org <mailto:sewen@apache.org>>><mailto:sewen
> @apache.org <http://apache.org/>>> wrote:
> 
> How about we break this up into two FLIPs? There are after all two
> orthogonal problems (termination, fault tolerance) with quite different
> discussion states.
> 
> Concerning fault tolerance, I like the ideas.
> For the termination proposal, I would like to iterate a bit more.
> 
> *Termination algorithm:*
> 
> My main concern here is the introduction of a termination coordinator
> and
> any involvement of RPC messages when deciding termination.
> That would be such a fundamental break with the current runtime
> architecture, and it would make the currently very elegant and simple
> model
> much more complicated and harder to maintain. Given that Flink's
> runtime is
> complex enough, I would really like to avoid that.
> 
> The current runtime paradigm coordinates between operators strictly via
> in-band events. RPC calls happen between operators and the master for
> triggering and acknowledging execution and checkpoints.
> 
> I was wondering whether we can keep following that paradigm and still
> get
> most of what you are proposing here. In some sense, all we need to do is
> replace RPC calls with in-band events, and "decentralize" the
> coordinator
> such that every operator can make its own termination decision by
> itself.
> 
> This is only a rough sketch, you probably need to flesh it out more.
> 
> - I assume that the OP in the diagram knows that it is in a loop and
> that
> it is the one connected to the head and tail
> 
> - When OP receives and EndOfStream Event from the regular source (RS),
> it
> emits an "AttemptTermination" event downstream to the operators
> involved in
> the loop. It attaches an attempt sequence number and memorizes that
> - Tail and Head forward these events
> - When OP receives the event back with the same attempt sequence number,
> and no records came in the meantime, it shuts down and emits EndOfStream
> downstream
> - When other records came back between emitting the AttemptTermination
> event and receiving it back, then it emits a new AttemptTermination
> event
> with the next sequence number.
> - This should terminate as soon as the loop is empty.
> 
> Might this model even generalize to nested loops, where the
> "AttemptTermination" event is scoped by the loop's nesting level?
> 
> Let me know what you think!
> 
> 
> Best,
> Stephan
> 
> 
> On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <sewen@apache.org <mailto:sewen@apache.org><mailto:sewen@apache.org
<mailto:sewen@apache.org>>
> <mailto:sewen@apache.org <mailto:sewen@apache.org>><mailto:
> sewen@apache.org <mailto:sewen@apache.org><mailto:sewen@apache.org <mailto:sewen@apache.org>>
<mailto:sewen@apache.org <mailto:sewen@apache.org>>>> wrote:
> 
> Hi!
> 
> I am still scanning it and compiling some comments. Give me a bit ;-)
> 
> Stephan
> 
> 
> On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se
<mailto:parisc@kth.se>> <mailto:
> parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se <mailto:parisc@kth.se>>><mailto:
> parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se <mailto:parisc@kth.se>>
<mailto:parisc@kth.se <mailto:parisc@kth.se>>>> wrote:
> 
> Hey all,
> 
> Now that many of you have already scanned the document (judging from the
> views) maybe it is time to give back some feedback!
> Did you like it? Would you suggest an improvement?
> 
> I would suggest not to leave this in the void. It has to do with
> important properties that the system promises to provide.
> Me and Fouad will do our best to answer your questions and discuss this
> further.
> 
> cheers
> Paris
> 
> On 21 Oct 2016, at 08:54, Paris Carbone <parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se
<mailto:parisc@kth.se>> <mailto:
> parisc@kth.se <mailto:parisc@kth.se><mailto:parisc@kth.se <mailto:parisc@kth.se>>><mailto:parisc@
> kth.se <http://kth.se/><http://kth.se <http://kth.se/>> <http://kth.se/
<http://kth.se/>>><mailto:parisc@k
> th.se <http://th.se/><http://th.se <http://th.se/>> <http://th.se/
<http://th.se/>><http://th.se <http://th.se/> <http://th.se/ <http://th.se/>>>>>
wrote:
> 
> Hello everyone,
> 
> Loops in Apache Flink have a good potential to become a much more
> powerful thing in future version of Apache Flink.
> There is generally high demand to make them usable and first of all
> production-ready for upcoming releases.
> 
> As a first commitment we would like to propose FLIP-13 for consistent
> processing with Loops.
> We are also working on scoped loops for Q1 2017 which we can share if
> there is enough interest.
> 
> For now, that is an improvement proposal that solves two pending major
> issues:
> 
> 1) The (not so trivial) problem of correct termination of jobs with
> iterations
> 2) The applicability of the checkpointing algorithm to iterative
> dataflow
> graphs.
> 
> We would really appreciate it if you go through the linked draft
> (motivation and proposed changes) for FLIP-13 and point out comments,
> preferably publicly in this devlist discussion before we go ahead and
> update the wiki.
> 
> https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0
> BhDbtoYucmByBjRBISs/edit?usp=sharing
> 
> cheers
> 
> Paris and Fouad


Mime
View raw message