flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Thoughts About Streaming
Date Thu, 23 Jul 2015 13:33:12 GMT
Yes, this can be changed. After all, this is only a design document and
meant to be discussed here and then changed. :D

For 1) IMHO an ordering does not make sense if you don't set a partitioning
(what keyBy() basically does) because elements can be in arbitrary
partitions and then the sorting is basically arbitrary.

For 2) This makes sense, yes. It is very obvious that it is non-parallel
and if the user absolutely needs this he should be able to do it.

On Thu, 23 Jul 2015 at 12:15 Matthias J. Sax <mjsax@informatik.hu-berlin.de>
wrote:

> Thanks for clarifying.
>
> For map/flapMap it makes sense. However, I would handle filter
> differently. Even if the re-grouping can be removed in an optimization
> step internally -- what might also be possible for map/flatMap -- I
> think it is annoying for the user to specify .byKey() twice...
>
> -> stream.byKey(MYKEY).filter(...).byKey(MYKEY).window(...)
> instead of the shorter
> -> stream.byKey(MYKEY).filter(...).window(...)
>
> Furthermore, I agree, that the use should choose global order explicitly.
>
> However, I disagree at some point:
>
> 1) OrderedDataStream in not non-parallel from my point of view. It only
> requires to receive all tuples within a Stream-Partition in order (not
> sure, if there is any use-case for this though).
>
> 2) Why not supporting OrderedNonParallelStream? NonParallelStream is
> already non-parallel, so adding global ordering on top if it should be
> no problem (and sounds useful to me).
>
>
> -Matthias
>
>
> On 07/23/2015 11:37 AM, Aljoscha Krettek wrote:
> > Hi,
> > I'll try to answer these one at a time:
> >
> > 1) After a map/flatMap the key (and partitioning) of the data is lost,
> > that's why it goes back to a vanilla DataStream. I think filter also has
> > this behavior just to fit in with the other ones. Also, any chain of
> > filter/map/flatMap can also be expressed in a single flatMap.
> >
> > 2) An OrderedDataStream would require a global ordering of the elements
> > which is an inherently non-parallel operation. With the API rework we
> want
> > to make fast (i.e. parallel, in most cases) operations more prominent
> while
> > making it hard to specify operations that cannot run in parallel. As for
> > windows, the ordering of the elements inside a window is decoupled from
> any
> > ordering of a DataStream, the user should be able to specify whether his
> > windowing policies or windowing functions require the input elements to
> be
> > ordered by timestamp.
> >
> > Now for the Discretization. I think it makes sense to keep it in the same
> > document at the beginning but this has the potential to grow very large,
> at
> > which point it should be moved to it's own document. (Exactly what Gyula
> > suggested.)
> >
> > On Thu, 23 Jul 2015 at 11:22 Matthias J. Sax <
> mjsax@informatik.hu-berlin.de>
> > wrote:
> >
> >> I just had a look into the "Streams+and+Operations+on+Streams" document.
> >>
> >> The initial figure is confusing... (it makes sense after reading the
> >> document but is a bumper in the beginning)
> >>
> >>
> >> A few comments/question:
> >>
> >> 1) Why is a (Ordered)KeyedDataStream converted into a DataStream if
> >> map/flatMap/filter is applied? (for broadcast/rebalance it makes sense
> >> to me)
> >>
> >> 2) Why is there only an ordered KEYED-DataStream? An
> >> NonParallelWindowStream might require ordered input, too. Maybe, even an
> >> OrderedDataStream, ie, non-keyed, would make sense, too.
> >>
> >>
> >> @Aljoscha: Decoupling API and window implementation is mandatory of
> >> course (from my point of view).
> >>
> >> @Gyula: I would prefer to extend the current document. It is easier to
> >> follow if all information is on a single place and not spread out.
> >>
> >>
> >> -Matthias
> >>
> >> On 07/23/2015 10:57 AM, Gyula Fóra wrote:
> >>> I think aside from the Discretization part we reached a consensus. I
> >> think
> >>> you can start with the implementation for the rest.
> >>>
> >>> I will do some updates to the Discretization part, and might even
> start a
> >>> new doc if it gets too long.
> >>>
> >>> Gyula
> >>>
> >>> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl.
> >> 23.,
> >>> Cs, 10:47):
> >>>
> >>>> What's the status of the discussion? What are the opinions on the
> >> reworking
> >>>> of the Streaming API as presented here:
> >>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
> >>>>
> >>>> If we could reach a consensus I would like to start working on this to
> >> have
> >>>> it done before the next release. In the process of this I would also
> >> like
> >>>> to decouple the current windowing implementation from the API to make
> it
> >>>> possible to select different windowing systems per job, as outlined
> >> here:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830
> >>>>
> >>>> On Mon, 29 Jun 2015 at 10:55 Stephan Ewen <sewen@apache.org> wrote:
> >>>>
> >>>>> @Matthias:
> >>>>>
> >>>>> I think using the KeyedDataStream will simply result in smaller
> >> programs.
> >>>>> May be hard for some users to make the connection to a
> >>>>> 1-element-tumbling-window, simply because they want to use state. Not
> >>>>> everyone is a deep into that stuff as you are ;-)
> >>>>>
> >>>>>
> >>>>> On Sun, Jun 28, 2015 at 1:13 AM, Matthias J. Sax <
> >>>>> mjsax@informatik.hu-berlin.de> wrote:
> >>>>>
> >>>>>> Yes. But as I said, you can get the same behavior with a
> >>>>>> GroupedDataStream using a tumbling 1-tuple-size window. Thus, there
> is
> >>>>>> no conceptual advantage in using KeyedDataStream and no disadvantage
> >> in
> >>>>>> binding stateful operations to GroupedDataStreams.
> >>>>>>
> >>>>>> On 06/27/2015 06:54 PM, Márton Balassi wrote:
> >>>>>>> @Matthias: Your point of working with a minimal number of clear
> >>>>> concepts
> >>>>>> is
> >>>>>>> desirable to say the least. :)
> >>>>>>>
> >>>>>>> The reasoning behind the KeyedDatastream is to associate Flink
> >>>>> persisted
> >>>>>>> operator state with the keys of the data that produced it, so that
> >>>>>> stateful
> >>>>>>> computation becomes scalabe in the future. This should not be tied
> to
> >>>>> the
> >>>>>>> GroupedDataStream, especially not if we are removing the option to
> >>>>> create
> >>>>>>> groups without windows as proposed on the Wiki by Stephan.
> >>>>>>>
> >>>>>>> On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax <
> >>>>>>> mjsax@informatik.hu-berlin.de> wrote:
> >>>>>>>
> >>>>>>>> This was more a conceptual point-of-view argument. From an
> >>>>>>>> implementation point of view, skipping the window building step
> is a
> >>>>>>>> good idea if a tumbling 1-tuple-size window is detected.
> >>>>>>>>
> >>>>>>>> I prefer to work with a minimum number of concepts (and apply
> >>>> internal
> >>>>>>>> optimization if possible) instead of using redundant concepts for
> >>>>>>>> special cases. Of course, this is my personal point of view.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote:
> >>>>>>>>> What do you mean by Comment 2? Using the whole window apparatus
> if
> >>>>> you
> >>>>>>>> just
> >>>>>>>>> want to have, for example, a simple partitioned filter with
> >>>>> partitioned
> >>>>>>>>> state seems a bit extravagant.
> >>>>>>>>>
> >>>>>>>>> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <
> >>>>>>>> mjsax@informatik.hu-berlin.de>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Nice starting point.
> >>>>>>>>>>
> >>>>>>>>>> Comment 1:
> >>>>>>>>>> "Each individual stream partition delivers elements strictly in
> >>>>>> order."
> >>>>>>>>>> (in 'Parallel Streams, Partitions, Time, and Ordering')
> >>>>>>>>>>
> >>>>>>>>>> I would say "FIFO" and not "strictly in order". If data is not
> >>>>> emitted
> >>>>>>>>>> in-order, the stream partition will not be in-order either.
> >>>>>>>>>>
> >>>>>>>>>> Comment 2:
> >>>>>>>>>> Why do we need KeyedDataStream. You can get everything done with
> >>>>>>>>>> GroupedDataStream (using a tumbling window of size = 1 tuple).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
> >>>>>>>>>>> Here is a first bit of what I have been writing down. Will add
> >>>> more
> >>>>>>>> over
> >>>>>>>>>>> the next days:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <parisc@kth.se>
> >>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> +1 for writing this down
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <
> >>>> aljoscha@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> +1 go ahead
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <sewen@apache.org>
> >>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hey!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This thread covers many different topics. Lets break this up
> >>>>> into
> >>>>>>>>>>>> separate
> >>>>>>>>>>>>>> discussions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Operator State is already driven by Gyula and Paris and
> >>>>>> happening
> >>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> above mentioned pull request and the followup discussions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - For windowing, this discussion has brought some results
> that
> >>>>> we
> >>>>>>>>>> should
> >>>>>>>>>>>>>> sum up and clearly write down.
> >>>>>>>>>>>>>>   I would like to chime in to do that based on what I
> learned
> >>>>> from
> >>>>>>>> the
> >>>>>>>>>>>>>> document and this discussion. I also got some input from
> >>>> Marton
> >>>>>>>> about
> >>>>>>>>>>>> what
> >>>>>>>>>>>>>> he learned from mapping the Cloud DataFlow constructs to
> >>>> Flink.
> >>>>>>>>>>>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton)
> >>>>> that
> >>>>>>>> sums
> >>>>>>>>>>>>>> this up and documents it for users (if we decide to adopt
> >>>> this).
> >>>>>>>>>>>>>>   Then we run this by Gyula, Matthias Sax and Kostas for
> >>>>> feedback.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - API style discussions should be yet another thread. This
> >>>> will
> >>>>>>>>>> probably
> >>>>>>>>>>>>>> be opened as people start to address that.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'll try to get a draft of the wiki version out tomorrow
> noon
> >>>>> and
> >>>>>>>> send
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> link around.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Greetings,
> >>>>>>>>>>>>>> Stephan
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
> >>>>>>>>>>>>>> mjsax@informatik.hu-berlin.de> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Sure. I picked this up. Using the current model for
> >>>> "occurrence
> >>>>>>>> time
> >>>>>>>>>>>>>>> semantics" does not work.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I elaborated on this in the past many times (but nobody
> >>>> cared).
> >>>>>> It
> >>>>>>>> is
> >>>>>>>>>>>>>>> important to make it clear to the user what semantics are
> >>>>>>>> supported.
> >>>>>>>>>>>>>>> Claiming to support "sliding windows" doesn't mean
> anything;
> >>>>>> there
> >>>>>>>>>> are
> >>>>>>>>>>>>>>> too many different semantics out there. :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
> >>>>>>>>>>>>>>>> Yes, I am aware of this requirement and it would also be
> >>>>>> supported
> >>>>>>>>>> in
> >>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>> proposed model.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The problem is, that the "custom timestamp" feature gives
> >>>> the
> >>>>>>>>>>>>>> impression
> >>>>>>>>>>>>>>>> that the elements would be windowed according to a
> >>>>>> user-timestamp.
> >>>>>>>>>> The
> >>>>>>>>>>>>>>>> results, however, are wrong because of the assumption
> about
> >>>>>>>> elements
> >>>>>>>>>>>>>>>> arriving in order. (This is what I was trying to show with
> >>>> my
> >>>>>>>> fancy
> >>>>>>>>>>>>>> ASCII
> >>>>>>>>>>>>>>>> art and result output.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
> >>>>>>>>>>>>>>> mjsax@informatik.hu-berlin.de>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I like that you are pushing in this direction. However,
> >>>> IMHO
> >>>>>> you
> >>>>>>>>>>>>>>>>> misinterpreter the current approach. It does not assume
> >>>> that
> >>>>>>>> tuples
> >>>>>>>>>>>>>>>>> arrive in-order; the current approach has no notion
> about a
> >>>>>>>>>>>>>>>>> pre-defined-order (for example, the order in which the
> >>>> event
> >>>>>> are
> >>>>>>>>>>>>>>>>> created). There is only the notion of "arrival-order" at
> >>>> the
> >>>>>>>>>>>> operator.
> >>>>>>>>>>>>>>>>> From this "arrival-order" perspective, the result are
> >>>>>> correct(!).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Windowing in the current approach means for example, "sum
> >>>> up
> >>>>> an
> >>>>>>>>>>>>>>>>> attribute of all events you *received* in the last 5
> >>>>> seconds".
> >>>>>>>> That
> >>>>>>>>>>>>>> is a
> >>>>>>>>>>>>>>>>> different meaning that "sum up an attribute of all event
> >>>> that
> >>>>>>>>>>>>>> *occurred*
> >>>>>>>>>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink
> >>>>> should
> >>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> both IMHO.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> >>>>>>>>>>>>>>>>>> Yes, now this also processes about 3 mio Elements
> (Window
> >>>>>> Size 5
> >>>>>>>>>>>> sec,
> >>>>>>>>>>>>>>>>> Slide
> >>>>>>>>>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and
> 5
> >>>>> mio.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Performance is not my main concern, however. My concern
> is
> >>>>>> that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>> model assumes elements to arrive in order, which is
> simply
> >>>>> not
> >>>>>>>>>> true.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> In your code you have these lines for specifying the
> >>>> window:
> >>>>>>>>>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>>>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Although this semantically specifies a tumbling window
> of
> >>>>>> size 1
> >>>>>>>>>> sec
> >>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>> afraid it uses the sliding window logic internally
> >>>> (because
> >>>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> .every()).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> In my tests I only have the first line.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <
> >>>> ggab90@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer.
> It
> >>>>>> should
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> fixed now. Can you please run it again?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I also tried to reproduce some of your performance
> >>>> numbers,
> >>>>>> but
> >>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>> getting only less than 1/10th of yours. For example, in
> >>>> the
> >>>>>>>>>>>> Tumbling
> >>>>>>>>>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do
> you
> >>>>>> have
> >>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>> idea what I could be doing wrong? My code:
> >>>>>>>>>>>>>>>>>>> http://pastebin.com/zbEjmGhk
> >>>>>>>>>>>>>>>>>>> I am running it on a 2 GHz Core i7.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>>>>>>>> Gabor
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <
> >>>>>>>> aljoscha@apache.org
> >>>>>>>>>>> :
> >>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse
> reducer)
> >>>>> now.
> >>>>>>>> The
> >>>>>>>>>>>>>>>>> results
> >>>>>>>>>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the
> >>>>> tuple
> >>>>>>>>>>>>>> source,
> >>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5
> >>>>> sec,
> >>>>>>>>>> Slide
> >>>>>>>>>>>> 1
> >>>>>>>>>>>>>>>>> sec)
> >>>>>>>>>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds
> >>>> but
> >>>>>>>> this
> >>>>>>>>>> is
> >>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> overhead). These are the results for the inverse
> reduce
> >>>>>>>>>>>>>> optimisation:
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,38)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,829)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,1625)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,2424)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,3190)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,3198)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-339368)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-1315725)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-2932932)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5082735)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-7743256)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,75701046)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,642829470)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,2242018381)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,5190708618)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,10060360311)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-94254951)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-219806321293)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-1258895232699)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-4074432596329)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> One line is one emitted window count. This is what
> >>>> happens
> >>>>>>>> when
> >>>>>>>>>> I
> >>>>>>>>>>>>>>>>> remove
> >>>>>>>>>>>>>>>>>>>> the Thread.sleep(1):
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,660676)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,2553733)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,3542696)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,1)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,1107035)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,2549491)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,4100387)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406583360092)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406582150743)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042713628318)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire
> and
> >>>>>> does
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>>> recover
> >>>>>>>>>>>>>>>>>>>> from it. The good thing is that it does produce
> results
> >>>>> now,
> >>>>>>>>>> where
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> previous Current/Reduce would simply hang and not
> >>>> produce
> >>>>>> any
> >>>>>>>>>>>>>> output.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <
> >>>>> ggab90@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Aljoscha, can you please try the performance test of
> >>>>>>>>>>>>>> Current/Reduce
> >>>>>>>>>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just
> call
> >>>>>> sum,
> >>>>>>>> it
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting
> >>>>> test,
> >>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>> the inverse function optimization really depends on
> the
> >>>>>>>> stream
> >>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>> ordered, and I think it has the potential of being
> >>>> faster
> >>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much
> >>>> larger
> >>>>>>>> than
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> slide size.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>>>>>>>>>> Gabor
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
> >>>>>>>>>> aljoscha@apache.org
> >>>>>>>>>>>>>>> :
> >>>>>>>>>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
> >>>>>>>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput
> >>>>>>>>>> measurements
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> alleviate concerns about performance.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> First, though, I want to highlight again why the
> >>>> current
> >>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur
> >>>>>>>>>> constantly
> >>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> distributed nature of the system). This is the
> >>>> example I
> >>>>>>>>>> posted
> >>>>>>>>>>>>>>>>>>> earlier:
> >>>>>>>>>>>>>>>>>>>>>>
> https://gist.github.com/aljoscha/a367012646ab98208d27
> >>>> .
> >>>>>> The
> >>>>>>>>>> plan
> >>>>>>>>>>>>>>>>> looks
> >>>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>> | | Source
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>> |
> >>>>>>>>>>>>>>>>>>>>>> +-----+
> >>>>>>>>>>>>>>>>>>>>>> | |
> >>>>>>>>>>>>>>>>>>>>>> | +--+
> >>>>>>>>>>>>>>>>>>>>>> | | | Identity Map
> >>>>>>>>>>>>>>>>>>>>>> | +--+
> >>>>>>>>>>>>>>>>>>>>>> | |
> >>>>>>>>>>>>>>>>>>>>>> +-----+
> >>>>>>>>>>>>>>>>>>>>>> |
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>> | | Window
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>> |
> >>>>>>>>>>>>>>>>>>>>>> |
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>> | | Sink
> >>>>>>>>>>>>>>>>>>>>>> +--+
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> So all it does is pass the elements through an
> >>>> identity
> >>>>>> map
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>> merge
> >>>>>>>>>>>>>>>>>>>>>> them again before the window operator. The source
> >>>> emits
> >>>>>>>>>>>> ascending
> >>>>>>>>>>>>>>>>>>>>> integers
> >>>>>>>>>>>>>>>>>>>>>> and the window operator has a custom timestamp
> >>>> extractor
> >>>>>>>> that
> >>>>>>>>>>>>>> uses
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> integer itself as the timestamp and should create
> >>>>> windows
> >>>>>> of
> >>>>>>>>>>>>>> size 4
> >>>>>>>>>>>>>>>>>>> (that
> >>>>>>>>>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the
> >>>> next
> >>>>>> are
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology
> >>>>>> basically
> >>>>>>>>>>>>>>> doubles
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> elements form the source I would expect to get these
> >>>>>>>> windows:
> >>>>>>>>>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
> >>>>>>>>>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The output is this, however:
> >>>>>>>>>>>>>>>>>>>>>> Window: 0, 1, 2, 3,
> >>>>>>>>>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> >>>>>>>>>>>>>>>>>>>>>> Window: 8, 9, 10, 11,
> >>>>>>>>>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> >>>>>>>>>>>>>>>>>>>>>> Window: 16, 17, 18, 19,
> >>>>>>>>>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21,
> >>>> 22,
> >>>>>> 23,
> >>>>>>>>>>>>>>>>>>>>>> Window: 24, 25, 26, 27,
> >>>>>>>>>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29,
> >>>> 30,
> >>>>>> 31,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The reason is that the elements simply arrive
> >>>>>> out-of-order.
> >>>>>>>>>>>>>> Imagine
> >>>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>> would happen if the elements actually arrived with
> >>>> some
> >>>>>>>> delay
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> different operations.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Now, on to the performance numbers. The
> >>>>> proof-of-concept I
> >>>>>>>>>>>>>> created
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> available here:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
> >>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>> basic
> >>>>>>>>>>>>>>>>>>>>>> idea is that sources assign the current timestamp
> when
> >>>>>>>>>> emitting
> >>>>>>>>>>>>>>>>>>> elements.
> >>>>>>>>>>>>>>>>>>>>>> They also periodically emit watermarks that tell us
> >>>> that
> >>>>>> no
> >>>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks
> >>>>>>>> propagate
> >>>>>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> operators. The window operator looks at the
> timestamp
> >>>> of
> >>>>>> an
> >>>>>>>>>>>>>> element
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> puts it into the buffer that corresponds to that
> >>>> window.
> >>>>>>>> When
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>>>>> operator receives a watermark it will look at the
> >>>>>> in-flight
> >>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>>>>> (basically the buffers) and emit those windows where
> >>>> the
> >>>>>>>>>>>>>> window-end
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> before the watermark.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For measuring throughput I did the following: The
> >>>> source
> >>>>>>>> emits
> >>>>>>>>>>>>>>> tuples
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The
> window
> >>>>>>>> operator
> >>>>>>>>>>>>>> sums
> >>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window
> >>>>>> operator
> >>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> a given time window. There are two different
> >>>>>> implementations
> >>>>>>>>>> for
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> summation: 1) simply summing up the values in a
> >>>>>> mapWindow(),
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2)
> >>>>> using
> >>>>>>>>>>>> sum(1),
> >>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
> >>>>>>>>>>>>>>> optimisations).
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> These are the performance numbers (Current is the
> >>>>> current
> >>>>>>>>>>>>>>>>>>> implementation,
> >>>>>>>>>>>>>>>>>>>>>> Next is my proof-of-concept):
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Tumbling (1 sec):
> >>>>>>>>>>>>>>>>>>>>>> - Current/Map: 1.6 mio
> >>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: 2 mio
> >>>>>>>>>>>>>>>>>>>>>> - Next/Map: 2.2 mio
> >>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 4 mio
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
> >>>>>>>>>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
> >>>>>>>>>>>>>>>>>>>>>> - Current/Reduce: No output
> >>>>>>>>>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
> >>>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 10 mio
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The Next/Reduce variant can basically scale
> >>>> indefinitely
> >>>>>>>> with
> >>>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>>>>>>>>> because the internal state does not rely on the
> number
> >>>>> of
> >>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>>> (it is
> >>>>>>>>>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding
> >>>>>> elements
> >>>>>>>>>>>>>> cannot
> >>>>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the
> >>>> two
> >>>>>> Map
> >>>>>>>>>>>>>>> variants
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> performance fluctuates because they always keep all
> >>>> the
> >>>>>>>>>> elements
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>> internal buffer before emission, this seems to tax
> the
> >>>>>>>> garbage
> >>>>>>>>>>>>>>>>>>> collector
> >>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> bit and leads to random pauses.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> One thing that should be noted is that I had to
> >>>> disable
> >>>>>> the
> >>>>>>>>>>>>>>>>>>> fake-element
> >>>>>>>>>>>>>>>>>>>>>> emission thread, otherwise the Current versions
> would
> >>>>>>>>>> deadlock.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> So, I started working on this because I thought that
> >>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>> processing would be necessary for correctness. And
> it
> >>>> is
> >>>>>>>>>>>>>> certainly,
> >>>>>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> proof-of-concept also shows that performance can be
> >>>>>> greatly
> >>>>>>>>>>>>>>> improved.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <
> >>>>>>>> gyula.fora@gmail.com
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I agree lets separate these topics from each other
> so
> >>>>> we
> >>>>>>>> can
> >>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>> faster
> >>>>>>>>>>>>>>>>>>>>>>> resolution.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> There is already a state discussion in the thread
> we
> >>>>>>>> started
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> Paris.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
> >>>>>>>>>>>>>>> ktzoumas@apache.org
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the
> box
> >>>>> :-),
> >>>>>>>>>> even
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>>>>>>>>>> a major refactoring. This is the right time to
> >>>>> refactor
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this
> is
> >>>>> more
> >>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>> features in the streaming API, which can be
> >>>>> prioritized
> >>>>>>>> once
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time
> to
> >>>>>> stall
> >>>>>>>>>> PRs
> >>>>>>>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> agree on the design).
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> There are three sections in the document:
> windowing,
> >>>>>>>> state,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> API.
> >>>>>>>>>>>>>>>>>>>>> How
> >>>>>>>>>>>>>>>>>>>>>>>> convoluted are those with each other? Can we
> >>>> separate
> >>>>>> the
> >>>>>>>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>>>> we need to discuss those all together? I think
> part
> >>>> of
> >>>>>> the
> >>>>>>>>>>>>>>>>>>> difficulty
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> that we are discussing three design choices at
> once.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> >>>>>>>>>>>>>>>>>>> ted.dunning@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.
> >>>>>> Typically,
> >>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>> happens is
> >>>>>>>>>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable
> >>>>> delay
> >>>>>>>> for
> >>>>>>>>>>>>>>>>>>> delayed
> >>>>>>>>>>>>>>>>>>>>>>>>> transactions and will commit to results when that
> >>>>> delay
> >>>>>>>> is
> >>>>>>>>>>>>>>>>>>> reached.
> >>>>>>>>>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff
> are
> >>>>>>>>>> collected
> >>>>>>>>>>>>>>>>>>>>>> specially
> >>>>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>> corrections which are reported/used when
> possible.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during
> >>>>>> processing,
> >>>>>>>>>> but
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>>>>>>>> is originally out of order the situation can't be
> >>>>>>>> repaired
> >>>>>>>>>> by
> >>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming
> >>>>>> disordered
> >>>>>>>>>> but
> >>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> handled
> >>>>>>>>>>>>>>>>>>>>>>>>> at the data level.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha
> Krettek <
> >>>>>>>>>>>>>>>>>>>>> aljoscha@apache.org
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they
> >>>> are
> >>>>>>>>>>>>>> necessary.
> >>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>>> reason
> >>>>>>>>>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing
> >>>> is
> >>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once
> >>>> in
> >>>>> a
> >>>>>>>>>> while;
> >>>>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>> occur
> >>>>>>>>>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example,
> >>>> in
> >>>>>>>> this:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> resulting
> >>>>>>>>>>>>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>>>>>>>>> are completely bogus because the current
> windowing
> >>>>>>>> system
> >>>>>>>>>>>>>>>>>>> assumes
> >>>>>>>>>>>>>>>>>>>>>>>>> elements
> >>>>>>>>>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not
> >>>>> true.
> >>>>>>>>>> (The
> >>>>>>>>>>>>>>>>>>>>> example
> >>>>>>>>>>>>>>>>>>>>>>>> has a
> >>>>>>>>>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then
> >>>>> these
> >>>>>>>> pass
> >>>>>>>>>>>>>>>>>>>>> through a
> >>>>>>>>>>>>>>>>>>>>>>>> map
> >>>>>>>>>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream
> >>>> before
> >>>>> a
> >>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>>>>> operator.)
> >>>>>>>>>>>>>>>>>>>>>>>>>> This simulates elements arriving from different
> >>>>>>>> operators
> >>>>>>>>>> at
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine
> >>>> this
> >>>>> to
> >>>>>>>> get
> >>>>>>>>>>>>>>>>>>> worse
> >>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>> higher DOP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
> >>>>>>>>>>>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements.
> >>>> This
> >>>>>> is
> >>>>>>>> an
> >>>>>>>>>>>>>>>>>>> example
> >>>>>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>>> the current Flink API:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8
> >>>>>> .
> >>>>>>>>>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5
> second
> >>>>>>>> window
> >>>>>>>>>>>>>>>>>>> operator
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that
> this
> >>>>>> code
> >>>>>>>>>>>>>>>>>>> deadlocks
> >>>>>>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I
> >>>> disable
> >>>>>> the
> >>>>>>>>>>>> fake
> >>>>>>>>>>>>>>>>>>>>>> element
> >>>>>>>>>>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is
> 4
> >>>>>> times
> >>>>>>>>>>>>>> higher
> >>>>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>> gap
> >>>>>>>>>>>>>>>>>>>>>>>>>> widens dramatically if the window size
> increases.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless
> I'm
> >>>>>>>> making a
> >>>>>>>>>>>>>>>>>>> mistake
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>>>> explorations) and can handle elements that
> arrive
> >>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>>>>>>>>>>>> happens basically always in a real-world
> windowing
> >>>>>>>>>>>>>> use-cases).
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
> >>>>>>>>>> sewen@apache.org
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed
> >>>> design
> >>>>> is
> >>>>>>>>>> that
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> need no
> >>>>>>>>>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event
> >>>> time".
> >>>>> It
> >>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>> differs in
> >>>>>>>>>>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the timestamps are assigned.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics
> of
> >>>>>> total
> >>>>>>>>>>>>>>>>>>> ordering
> >>>>>>>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>>>>>>>>> imposing merges on the streams.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J.
> >>>> Sax <
> >>>>>>>>>>>>>>>>>>>>>>>>>>> mjsax@informatik.hu-berlin.de> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple
> >>>> alternatives
> >>>>>> the
> >>>>>>>>>>>>>>>>>>> user(!)
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing
> >>>> works
> >>>>>> for
> >>>>>>>>>>>>>>>>>>>>> many/most
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
> >>>>>>>>>>>>>>>>>>> Event-Pattern-Matching,
> >>>>>>>>>>>>>>>>>>>>>> global
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance
> >>>>>> penalty
> >>>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> high).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
> >>>>>>>>>> alternative
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> "source
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> assigned ts-windows".
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the
> >>>>>> following
> >>>>>>>>>>>>>>>>>>> paper
> >>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in
> >>>>> continuous
> >>>>>>>>>>>>>>>>>>>>>> sliding-window
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily
> >>>> if
> >>>>>> your
> >>>>>>>>>>>>>>>>>>>>>> suggested
> >>>>>>>>>>>>>>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> might touch them at some point.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put
> everything
> >>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> Datastream
> >>>>>>>>>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be a huge mess.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order
> >>>>>> processing,
> >>>>>>>>>>>>>>>>>>>>> whether
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite
> costly).
> >>>>>>>> Another
> >>>>>>>>>>>>>>>>>>>>>>>> alternative
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current
> >>>>> windowing
> >>>>>> is
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> filter
> >>>>>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling
> >>>>> operator
> >>>>>> on
> >>>>>>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fairly lightweight.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some
> >>>>>> alternative
> >>>>>>>>>>>>>>>>>>>>>> solutions.
> >>>>>>>>>>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha
> >>>> Krettek
> >>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>> aljoscha@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need
> >>>> to
> >>>>>>>> think
> >>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of
> >>>>> Gabor
> >>>>>>>>>>>>>>>>>>> (inverse
> >>>>>>>>>>>>>>>>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
> >>>>>>>>>> DataStream).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current
> >>>>> model
> >>>>>>>> does
> >>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the
> whole
> >>>>>>>>>> windowing
> >>>>>>>>>>>>>>>>>>>>>>>>>> infrastructure
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also
> that
> >>>>> any
> >>>>>>>>>> work
> >>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do
> >>>> now
> >>>>>>>>>> becomes
> >>>>>>>>>>>>>>>>>>>>>> useless.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
> >>>>>>>>>> interactions
> >>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and
> >>>>>>>> grouping/windowing.
> >>>>>>>>>>>>>>>>>>> (See
> >>>>>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>>>>>> section
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the doc I posted.)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> >>>>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very
> >>>>> good
> >>>>>>>>>>>>>>>>>>>>> initiative.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective
> >>>>> sections
> >>>>>>>>>>>>>>>>>>> (where I
> >>>>>>>>>>>>>>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message