flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@informatik.hu-berlin.de>
Subject Re: Thoughts About Streaming
Date Thu, 23 Jul 2015 09:19:30 GMT
I just had a look into the "Streams+and+Operations+on+Streams" document.

The initial figure is confusing... (it makes sense after reading the
document but is a bumper in the beginning)


A few comments/question:

1) Why is a (Ordered)KeyedDataStream converted into a DataStream if
map/flatMap/filter is applied? (for broadcast/rebalance it makes sense
to me)

2) Why is there only an ordered KEYED-DataStream? An
NonParallelWindowStream might require ordered input, too. Maybe, even an
OrderedDataStream, ie, non-keyed, would make sense, too.


@Aljoscha: Decoupling API and window implementation is mandatory of
course (from my point of view).

@Gyula: I would prefer to extend the current document. It is easier to
follow if all information is on a single place and not spread out.


-Matthias

On 07/23/2015 10:57 AM, Gyula Fóra wrote:
> I think aside from the Discretization part we reached a consensus. I think
> you can start with the implementation for the rest.
> 
> I will do some updates to the Discretization part, and might even start a
> new doc if it gets too long.
> 
> Gyula
> 
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 23.,
> Cs, 10:47):
> 
>> What's the status of the discussion? What are the opinions on the reworking
>> of the Streaming API as presented here:
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
>>
>> If we could reach a consensus I would like to start working on this to have
>> it done before the next release. In the process of this I would also like
>> to decouple the current windowing implementation from the API to make it
>> possible to select different windowing systems per job, as outlined here:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830
>>
>> On Mon, 29 Jun 2015 at 10:55 Stephan Ewen <sewen@apache.org> wrote:
>>
>>> @Matthias:
>>>
>>> I think using the KeyedDataStream will simply result in smaller programs.
>>> May be hard for some users to make the connection to a
>>> 1-element-tumbling-window, simply because they want to use state. Not
>>> everyone is a deep into that stuff as you are ;-)
>>>
>>>
>>> On Sun, Jun 28, 2015 at 1:13 AM, Matthias J. Sax <
>>> mjsax@informatik.hu-berlin.de> wrote:
>>>
>>>> Yes. But as I said, you can get the same behavior with a
>>>> GroupedDataStream using a tumbling 1-tuple-size window. Thus, there is
>>>> no conceptual advantage in using KeyedDataStream and no disadvantage in
>>>> binding stateful operations to GroupedDataStreams.
>>>>
>>>> On 06/27/2015 06:54 PM, Márton Balassi wrote:
>>>>> @Matthias: Your point of working with a minimal number of clear
>>> concepts
>>>> is
>>>>> desirable to say the least. :)
>>>>>
>>>>> The reasoning behind the KeyedDatastream is to associate Flink
>>> persisted
>>>>> operator state with the keys of the data that produced it, so that
>>>> stateful
>>>>> computation becomes scalabe in the future. This should not be tied to
>>> the
>>>>> GroupedDataStream, especially not if we are removing the option to
>>> create
>>>>> groups without windows as proposed on the Wiki by Stephan.
>>>>>
>>>>> On Sat, Jun 27, 2015 at 4:15 PM, Matthias J. Sax <
>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> This was more a conceptual point-of-view argument. From an
>>>>>> implementation point of view, skipping the window building step is a
>>>>>> good idea if a tumbling 1-tuple-size window is detected.
>>>>>>
>>>>>> I prefer to work with a minimum number of concepts (and apply
>> internal
>>>>>> optimization if possible) instead of using redundant concepts for
>>>>>> special cases. Of course, this is my personal point of view.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 06/27/2015 03:47 PM, Aljoscha Krettek wrote:
>>>>>>> What do you mean by Comment 2? Using the whole window apparatus if
>>> you
>>>>>> just
>>>>>>> want to have, for example, a simple partitioned filter with
>>> partitioned
>>>>>>> state seems a bit extravagant.
>>>>>>>
>>>>>>> On Sat, 27 Jun 2015 at 15:19 Matthias J. Sax <
>>>>>> mjsax@informatik.hu-berlin.de>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nice starting point.
>>>>>>>>
>>>>>>>> Comment 1:
>>>>>>>> "Each individual stream partition delivers elements strictly in
>>>> order."
>>>>>>>> (in 'Parallel Streams, Partitions, Time, and Ordering')
>>>>>>>>
>>>>>>>> I would say "FIFO" and not "strictly in order". If data is not
>>> emitted
>>>>>>>> in-order, the stream partition will not be in-order either.
>>>>>>>>
>>>>>>>> Comment 2:
>>>>>>>> Why do we need KeyedDataStream. You can get everything done with
>>>>>>>> GroupedDataStream (using a tumbling window of size = 1 tuple).
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 06/26/2015 07:42 PM, Stephan Ewen wrote:
>>>>>>>>> Here is a first bit of what I have been writing down. Will add
>> more
>>>>>> over
>>>>>>>>> the next days:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Stream+Windows
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Parallel+Streams%2C+Partitions%2C+Time%2C+and+Ordering
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 25, 2015 at 6:35 PM, Paris Carbone <parisc@kth.se>
>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1 for writing this down
>>>>>>>>>>
>>>>>>>>>>> On 25 Jun 2015, at 18:11, Aljoscha Krettek <
>> aljoscha@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> +1 go ahead
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 25 Jun 2015 at 18:02 Stephan Ewen <sewen@apache.org>
>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey!
>>>>>>>>>>>>
>>>>>>>>>>>> This thread covers many different topics. Lets break this up
>>> into
>>>>>>>>>> separate
>>>>>>>>>>>> discussions.
>>>>>>>>>>>>
>>>>>>>>>>>> - Operator State is already driven by Gyula and Paris and
>>>> happening
>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>> above mentioned pull request and the followup discussions.
>>>>>>>>>>>>
>>>>>>>>>>>> - For windowing, this discussion has brought some results that
>>> we
>>>>>>>> should
>>>>>>>>>>>> sum up and clearly write down.
>>>>>>>>>>>>   I would like to chime in to do that based on what I learned
>>> from
>>>>>> the
>>>>>>>>>>>> document and this discussion. I also got some input from
>> Marton
>>>>>> about
>>>>>>>>>> what
>>>>>>>>>>>> he learned from mapping the Cloud DataFlow constructs to
>> Flink.
>>>>>>>>>>>>   I'll draft a Wiki page (with the help of Aljoscha, Marton)
>>> that
>>>>>> sums
>>>>>>>>>>>> this up and documents it for users (if we decide to adopt
>> this).
>>>>>>>>>>>>   Then we run this by Gyula, Matthias Sax and Kostas for
>>> feedback.
>>>>>>>>>>>>
>>>>>>>>>>>> - API style discussions should be yet another thread. This
>> will
>>>>>>>> probably
>>>>>>>>>>>> be opened as people start to address that.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I'll try to get a draft of the wiki version out tomorrow noon
>>> and
>>>>>> send
>>>>>>>>>> the
>>>>>>>>>>>> link around.
>>>>>>>>>>>>
>>>>>>>>>>>> Greetings,
>>>>>>>>>>>> Stephan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 25, 2015 at 3:51 PM, Matthias J. Sax <
>>>>>>>>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Sure. I picked this up. Using the current model for
>> "occurrence
>>>>>> time
>>>>>>>>>>>>> semantics" does not work.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I elaborated on this in the past many times (but nobody
>> cared).
>>>> It
>>>>>> is
>>>>>>>>>>>>> important to make it clear to the user what semantics are
>>>>>> supported.
>>>>>>>>>>>>> Claiming to support "sliding windows" doesn't mean anything;
>>>> there
>>>>>>>> are
>>>>>>>>>>>>> too many different semantics out there. :)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:
>>>>>>>>>>>>>> Yes, I am aware of this requirement and it would also be
>>>> supported
>>>>>>>> in
>>>>>>>>>>>> my
>>>>>>>>>>>>>> proposed model.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The problem is, that the "custom timestamp" feature gives
>> the
>>>>>>>>>>>> impression
>>>>>>>>>>>>>> that the elements would be windowed according to a
>>>> user-timestamp.
>>>>>>>> The
>>>>>>>>>>>>>> results, however, are wrong because of the assumption about
>>>>>> elements
>>>>>>>>>>>>>> arriving in order. (This is what I was trying to show with
>> my
>>>>>> fancy
>>>>>>>>>>>> ASCII
>>>>>>>>>>>>>> art and result output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <
>>>>>>>>>>>>> mjsax@informatik.hu-berlin.de>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I like that you are pushing in this direction. However,
>> IMHO
>>>> you
>>>>>>>>>>>>>>> misinterpreter the current approach. It does not assume
>> that
>>>>>> tuples
>>>>>>>>>>>>>>> arrive in-order; the current approach has no notion about a
>>>>>>>>>>>>>>> pre-defined-order (for example, the order in which the
>> event
>>>> are
>>>>>>>>>>>>>>> created). There is only the notion of "arrival-order" at
>> the
>>>>>>>>>> operator.
>>>>>>>>>>>>>>> From this "arrival-order" perspective, the result are
>>>> correct(!).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Windowing in the current approach means for example, "sum
>> up
>>> an
>>>>>>>>>>>>>>> attribute of all events you *received* in the last 5
>>> seconds".
>>>>>> That
>>>>>>>>>>>> is a
>>>>>>>>>>>>>>> different meaning that "sum up an attribute of all event
>> that
>>>>>>>>>>>> *occurred*
>>>>>>>>>>>>>>> in the last 5 seconds". Both queries are valid and Flink
>>> should
>>>>>>>>>>>> support
>>>>>>>>>>>>>>> both IMHO.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>> Yes, now this also processes about 3 mio Elements (Window
>>>> Size 5
>>>>>>>>>> sec,
>>>>>>>>>>>>>>> Slide
>>>>>>>>>>>>>>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5
>>> mio.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Performance is not my main concern, however. My concern is
>>>> that
>>>>>>>> the
>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>> model assumes elements to arrive in order, which is simply
>>> not
>>>>>>>> true.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In your code you have these lines for specifying the
>> window:
>>>>>>>>>>>>>>>> .window(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>> .every(Time.of(1l, TimeUnit.SECONDS))
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Although this semantically specifies a tumbling window of
>>>> size 1
>>>>>>>> sec
>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>> afraid it uses the sliding window logic internally
>> (because
>>> of
>>>>>> the
>>>>>>>>>>>>>>>> .every()).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In my tests I only have the first line.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <
>> ggab90@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm very sorry, I had a bug in the InversePreReducer. It
>>>> should
>>>>>>>> be
>>>>>>>>>>>>>>>>> fixed now. Can you please run it again?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I also tried to reproduce some of your performance
>> numbers,
>>>> but
>>>>>>>> I'm
>>>>>>>>>>>>>>>>> getting only less than 1/10th of yours. For example, in
>> the
>>>>>>>>>> Tumbling
>>>>>>>>>>>>>>>>> case, Current/Reduce produces only ~100000 for me. Do you
>>>> have
>>>>>>>> any
>>>>>>>>>>>>>>>>> idea what I could be doing wrong? My code:
>>>>>>>>>>>>>>>>> http://pastebin.com/zbEjmGhk
>>>>>>>>>>>>>>>>> I am running it on a 2 GHz Core i7.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>> Gabor
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <
>>>>>> aljoscha@apache.org
>>>>>>>>> :
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> I also ran the tests on top of PR 856 (inverse reducer)
>>> now.
>>>>>> The
>>>>>>>>>>>>>>> results
>>>>>>>>>>>>>>>>>> seem incorrect. When I insert a Thread.sleep(1) in the
>>> tuple
>>>>>>>>>>>> source,
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>> the previous tests reported around 3600 tuples (Size 5
>>> sec,
>>>>>>>> Slide
>>>>>>>>>> 1
>>>>>>>>>>>>>>> sec)
>>>>>>>>>>>>>>>>>> (Theoretically there would be 5000 tuples in 5 seconds
>> but
>>>>>> this
>>>>>>>> is
>>>>>>>>>>>>> due
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> overhead). These are the results for the inverse reduce
>>>>>>>>>>>> optimisation:
>>>>>>>>>>>>>>>>>> (Tuple 0,38)
>>>>>>>>>>>>>>>>>> (Tuple 0,829)
>>>>>>>>>>>>>>>>>> (Tuple 0,1625)
>>>>>>>>>>>>>>>>>> (Tuple 0,2424)
>>>>>>>>>>>>>>>>>> (Tuple 0,3190)
>>>>>>>>>>>>>>>>>> (Tuple 0,3198)
>>>>>>>>>>>>>>>>>> (Tuple 0,-339368)
>>>>>>>>>>>>>>>>>> (Tuple 0,-1315725)
>>>>>>>>>>>>>>>>>> (Tuple 0,-2932932)
>>>>>>>>>>>>>>>>>> (Tuple 0,-5082735)
>>>>>>>>>>>>>>>>>> (Tuple 0,-7743256)
>>>>>>>>>>>>>>>>>> (Tuple 0,75701046)
>>>>>>>>>>>>>>>>>> (Tuple 0,642829470)
>>>>>>>>>>>>>>>>>> (Tuple 0,2242018381)
>>>>>>>>>>>>>>>>>> (Tuple 0,5190708618)
>>>>>>>>>>>>>>>>>> (Tuple 0,10060360311)
>>>>>>>>>>>>>>>>>> (Tuple 0,-94254951)
>>>>>>>>>>>>>>>>>> (Tuple 0,-219806321293)
>>>>>>>>>>>>>>>>>> (Tuple 0,-1258895232699)
>>>>>>>>>>>>>>>>>> (Tuple 0,-4074432596329)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> One line is one emitted window count. This is what
>> happens
>>>>>> when
>>>>>>>> I
>>>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>>>>> the Thread.sleep(1):
>>>>>>>>>>>>>>>>>> (Tuple 0,660676)
>>>>>>>>>>>>>>>>>> (Tuple 0,2553733)
>>>>>>>>>>>>>>>>>> (Tuple 0,3542696)
>>>>>>>>>>>>>>>>>> (Tuple 0,1)
>>>>>>>>>>>>>>>>>> (Tuple 0,1107035)
>>>>>>>>>>>>>>>>>> (Tuple 0,2549491)
>>>>>>>>>>>>>>>>>> (Tuple 0,4100387)
>>>>>>>>>>>>>>>>>> (Tuple 0,-8406583360092)
>>>>>>>>>>>>>>>>>> (Tuple 0,-8406582150743)
>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>> (Tuple 0,-8406580427190)
>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042713628318)
>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and
>>>> does
>>>>>>>> not
>>>>>>>>>>>>>>> recover
>>>>>>>>>>>>>>>>>> from it. The good thing is that it does produce results
>>> now,
>>>>>>>> where
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> previous Current/Reduce would simply hang and not
>> produce
>>>> any
>>>>>>>>>>>> output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <
>>> ggab90@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Aljoscha, can you please try the performance test of
>>>>>>>>>>>> Current/Reduce
>>>>>>>>>>>>>>>>>>> with the InversePreReducer in PR 856? (If you just call
>>>> sum,
>>>>>> it
>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> use an InversePreReducer.) It would be an interesting
>>> test,
>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>> the inverse function optimization really depends on the
>>>>>> stream
>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>>>> ordered, and I think it has the potential of being
>> faster
>>>>>> then
>>>>>>>>>>>>>>>>>>> Next/Reduce. Especially if the window size is much
>> larger
>>>>>> than
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> slide size.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>>> Gabor
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <
>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>> I think I'll have to elaborate a bit so I created a
>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>> implementation of my Ideas and ran some throughput
>>>>>>>> measurements
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> alleviate concerns about performance.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> First, though, I want to highlight again why the
>> current
>>>>>>>>>> approach
>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> work with out-of-order elements (which, again, occur
>>>>>>>> constantly
>>>>>>>>>>>> due
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> distributed nature of the system). This is the
>> example I
>>>>>>>> posted
>>>>>>>>>>>>>>>>> earlier:
>>>>>>>>>>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27
>> .
>>>> The
>>>>>>>> plan
>>>>>>>>>>>>>>> looks
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>> | | Source
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>>>>>>>> | | | Identity Map
>>>>>>>>>>>>>>>>>>>> | +--+
>>>>>>>>>>>>>>>>>>>> | |
>>>>>>>>>>>>>>>>>>>> +-----+
>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>> | | Window
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>> |
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>> | | Sink
>>>>>>>>>>>>>>>>>>>> +--+
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So all it does is pass the elements through an
>> identity
>>>> map
>>>>>>>> and
>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>> merge
>>>>>>>>>>>>>>>>>>>> them again before the window operator. The source
>> emits
>>>>>>>>>> ascending
>>>>>>>>>>>>>>>>>>> integers
>>>>>>>>>>>>>>>>>>>> and the window operator has a custom timestamp
>> extractor
>>>>>> that
>>>>>>>>>>>> uses
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> integer itself as the timestamp and should create
>>> windows
>>>> of
>>>>>>>>>>>> size 4
>>>>>>>>>>>>>>>>> (that
>>>>>>>>>>>>>>>>>>>> is elements with timestamp 0-3 are one window, the
>> next
>>>> are
>>>>>>>> the
>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>> with timestamp 4-8, and so on). Since the topology
>>>> basically
>>>>>>>>>>>>> doubles
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> elements form the source I would expect to get these
>>>>>> windows:
>>>>>>>>>>>>>>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>>>>>>>>>>>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The output is this, however:
>>>>>>>>>>>>>>>>>>>> Window: 0, 1, 2, 3,
>>>>>>>>>>>>>>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>>>>>>>>>>>>>>>>> Window: 8, 9, 10, 11,
>>>>>>>>>>>>>>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>>>>>>>>>>>>>>>>> Window: 16, 17, 18, 19,
>>>>>>>>>>>>>>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21,
>> 22,
>>>> 23,
>>>>>>>>>>>>>>>>>>>> Window: 24, 25, 26, 27,
>>>>>>>>>>>>>>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29,
>> 30,
>>>> 31,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The reason is that the elements simply arrive
>>>> out-of-order.
>>>>>>>>>>>> Imagine
>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>> would happen if the elements actually arrived with
>> some
>>>>>> delay
>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>> different operations.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Now, on to the performance numbers. The
>>> proof-of-concept I
>>>>>>>>>>>> created
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> available here:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock
>>>>>>>>>>>> .
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>> basic
>>>>>>>>>>>>>>>>>>>> idea is that sources assign the current timestamp when
>>>>>>>> emitting
>>>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>>>>> They also periodically emit watermarks that tell us
>> that
>>>> no
>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> an earlier timestamp will be emitted. The watermarks
>>>>>> propagate
>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> operators. The window operator looks at the timestamp
>> of
>>>> an
>>>>>>>>>>>> element
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> puts it into the buffer that corresponds to that
>> window.
>>>>>> When
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>> operator receives a watermark it will look at the
>>>> in-flight
>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>> (basically the buffers) and emit those windows where
>> the
>>>>>>>>>>>> window-end
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> before the watermark.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For measuring throughput I did the following: The
>> source
>>>>>> emits
>>>>>>>>>>>>> tuples
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> the form ("tuple", 1) in an infinite loop. The window
>>>>>> operator
>>>>>>>>>>>> sums
>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> tuples, thereby counting how many tuples the window
>>>> operator
>>>>>>>> can
>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> a given time window. There are two different
>>>> implementations
>>>>>>>> for
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> summation: 1) simply summing up the values in a
>>>> mapWindow(),
>>>>>>>>>>>> there
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>> a List of all tuples and simple iterate over it. 2)
>>> using
>>>>>>>>>> sum(1),
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> implemented as a reduce() (that uses the pre-reducer
>>>>>>>>>>>>> optimisations).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> These are the performance numbers (Current is the
>>> current
>>>>>>>>>>>>>>>>> implementation,
>>>>>>>>>>>>>>>>>>>> Next is my proof-of-concept):
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Tumbling (1 sec):
>>>>>>>>>>>>>>>>>>>> - Current/Map: 1.6 mio
>>>>>>>>>>>>>>>>>>>> - Current/Reduce: 2 mio
>>>>>>>>>>>>>>>>>>>> - Next/Map: 2.2 mio
>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 4 mio
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Sliding (5 sec, slide 1 sec):
>>>>>>>>>>>>>>>>>>>> - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>>>>>>>>>>>>>>>> - Current/Reduce: No output
>>>>>>>>>>>>>>>>>>>> - Next/Map: ca 4 mio (fluctuates)
>>>>>>>>>>>>>>>>>>>> - Next/Reduce: 10 mio
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The Next/Reduce variant can basically scale
>> indefinitely
>>>>>> with
>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>> size
>>>>>>>>>>>>>>>>>>>> because the internal state does not rely on the number
>>> of
>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>> (it is
>>>>>>>>>>>>>>>>>>>> just the current sum). The pre-reducer for sliding
>>>> elements
>>>>>>>>>>>> cannot
>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>> the amount of tuples, it produces no output. For the
>> two
>>>> Map
>>>>>>>>>>>>> variants
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> performance fluctuates because they always keep all
>> the
>>>>>>>> elements
>>>>>>>>>>>> in
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> internal buffer before emission, this seems to tax the
>>>>>> garbage
>>>>>>>>>>>>>>>>> collector
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> bit and leads to random pauses.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> One thing that should be noted is that I had to
>> disable
>>>> the
>>>>>>>>>>>>>>>>> fake-element
>>>>>>>>>>>>>>>>>>>> emission thread, otherwise the Current versions would
>>>>>>>> deadlock.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So, I started working on this because I thought that
>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>> processing would be necessary for correctness. And it
>> is
>>>>>>>>>>>> certainly,
>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> proof-of-concept also shows that performance can be
>>>> greatly
>>>>>>>>>>>>> improved.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <
>>>>>> gyula.fora@gmail.com
>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I agree lets separate these topics from each other so
>>> we
>>>>>> can
>>>>>>>>>> get
>>>>>>>>>>>>>>>>> faster
>>>>>>>>>>>>>>>>>>>>> resolution.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> There is already a state discussion in the thread we
>>>>>> started
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>> Paris.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <
>>>>>>>>>>>>> ktzoumas@apache.org
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I agree with supporting out-of-order out of the box
>>> :-),
>>>>>>>> even
>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>>>>>> a major refactoring. This is the right time to
>>> refactor
>>>>>> the
>>>>>>>>>>>>>>>>> streaming
>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>> before we pull it out of beta. I think that this is
>>> more
>>>>>>>>>>>>> important
>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>> features in the streaming API, which can be
>>> prioritized
>>>>>> once
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>> of beta (meaning, that IMO this is the right time to
>>>> stall
>>>>>>>> PRs
>>>>>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> agree on the design).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> There are three sections in the document: windowing,
>>>>>> state,
>>>>>>>>>> and
>>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>> How
>>>>>>>>>>>>>>>>>>>>>> convoluted are those with each other? Can we
>> separate
>>>> the
>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>> we need to discuss those all together? I think part
>> of
>>>> the
>>>>>>>>>>>>>>>>> difficulty
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> that we are discussing three design choices at once.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>>>>>>>>>>>>>>>>> ted.dunning@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Out of order is ubiquitous in the real-world.
>>>> Typically,
>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>> happens is
>>>>>>>>>>>>>>>>>>>>>>> that businesses will declare a maximum allowable
>>> delay
>>>>>> for
>>>>>>>>>>>>>>>>> delayed
>>>>>>>>>>>>>>>>>>>>>>> transactions and will commit to results when that
>>> delay
>>>>>> is
>>>>>>>>>>>>>>>>> reached.
>>>>>>>>>>>>>>>>>>>>>>> Transactions that arrive later than this cutoff are
>>>>>>>> collected
>>>>>>>>>>>>>>>>>>>> specially
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Clearly, ordering can also be violated during
>>>> processing,
>>>>>>>> but
>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>> is originally out of order the situation can't be
>>>>>> repaired
>>>>>>>> by
>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>>>>>>>> fixes that prevent transactions from becoming
>>>> disordered
>>>>>>>> but
>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> handled
>>>>>>>>>>>>>>>>>>>>>>> at the data level.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>>>>>>>>>>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I also don't like big changes but sometimes they
>> are
>>>>>>>>>>>> necessary.
>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>>>>>>>>>> why I'm so adamant about out-of-order processing
>> is
>>>> that
>>>>>>>>>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>>>>>> elements are not some exception that occurs once
>> in
>>> a
>>>>>>>> while;
>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>> occur
>>>>>>>>>>>>>>>>>>>>>>>> constantly in a distributed system. For example,
>> in
>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>>>
>>> https://gist.github.com/aljoscha/a367012646ab98208d27
>>>>>> the
>>>>>>>>>>>>>>>>>>> resulting
>>>>>>>>>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>>>>> are completely bogus because the current windowing
>>>>>> system
>>>>>>>>>>>>>>>>> assumes
>>>>>>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>>>>>> to globally arrive in order, which is simply not
>>> true.
>>>>>>>> (The
>>>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>>>>>>>>>> source that generates increasing integers. Then
>>> these
>>>>>> pass
>>>>>>>>>>>>>>>>>>> through a
>>>>>>>>>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>>>>>>>>>>> and are unioned with the original DataStream
>> before
>>> a
>>>>>>>> window
>>>>>>>>>>>>>>>>>>>> operator.)
>>>>>>>>>>>>>>>>>>>>>>>> This simulates elements arriving from different
>>>>>> operators
>>>>>>>> at
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>> operator. The example is also DOP=1, I imagine
>> this
>>> to
>>>>>> get
>>>>>>>>>>>>>>>>> worse
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> higher DOP.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> What do you mean by costly? As I said, I have a
>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>> operator that can handle out-or-order elements.
>> This
>>>> is
>>>>>> an
>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>>> the current Flink API:
>>>>>>>>>>>>>>>>>>>>>>>>
>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8
>>>> .
>>>>>>>>>>>>>>>>>>>>>>>> (It is an infinite source of tuples and a 5 second
>>>>>> window
>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> counts the tuples.) The first problem is that this
>>>> code
>>>>>>>>>>>>>>>>> deadlocks
>>>>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>>>> of the thread that emits fake elements. If I
>> disable
>>>> the
>>>>>>>>>> fake
>>>>>>>>>>>>>>>>>>>> element
>>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>> it works, but the throughput using my mockup is 4
>>>> times
>>>>>>>>>>>> higher
>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> gap
>>>>>>>>>>>>>>>>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> So, it actually increases performance (unless I'm
>>>>>> making a
>>>>>>>>>>>>>>>>> mistake
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>> explorations) and can handle elements that arrive
>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>>>>>>>> happens basically always in a real-world windowing
>>>>>>>>>>>> use-cases).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <
>>>>>>>> sewen@apache.org
>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> What I like a lot about Aljoscha's proposed
>> design
>>> is
>>>>>>>> that
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> need no
>>>>>>>>>>>>>>>>>>>>>>>>> different code for "system time" vs. "event
>> time".
>>> It
>>>>>>>> only
>>>>>>>>>>>>>>>>>>>> differs in
>>>>>>>>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The OOP approach also gives you the semantics of
>>>> total
>>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J.
>> Sax <
>>>>>>>>>>>>>>>>>>>>>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that there should be multiple
>> alternatives
>>>> the
>>>>>>>>>>>>>>>>> user(!)
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> choose from. Partial out-of-order processing
>> works
>>>> for
>>>>>>>>>>>>>>>>>>> many/most
>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates. However, if you consider
>>>>>>>>>>>>>>>>> Event-Pattern-Matching,
>>>>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>>>>>>>> ordering in necessary (even if the performance
>>>> penalty
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> high).
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I would also keep "system-time windows" as an
>>>>>>>> alternative
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> "source
>>>>>>>>>>>>>>>>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> It might also be interesting to consider the
>>>> following
>>>>>>>>>>>>>>>>> paper
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> overlapping windows: "Resource sharing in
>>> continuous
>>>>>>>>>>>>>>>>>>>> sliding-window
>>>>>>>>>>>>>>>>>>>>>>>>>> aggregates"
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I think we should not block PRs unnecessarily
>> if
>>>> your
>>>>>>>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I still think we should not put everything
>>> in
>>>>>> the
>>>>>>>>>>>>>>>>>>>> Datastream
>>>>>>>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Also we need to agree on the out of order
>>>> processing,
>>>>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>> the way you proposed it(which is quite costly).
>>>>>> Another
>>>>>>>>>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>>>>>>>>>>> approach there which fits in the current
>>> windowing
>>>> is
>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>> order events and apply a special handling
>>> operator
>>>> on
>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> My point is that we need to consider some
>>>> alternative
>>>>>>>>>>>>>>>>>>>> solutions.
>>>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha
>> Krettek
>>> <
>>>>>>>>>>>>>>>>>>>>>>>> aljoscha@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The reason I posted this now is that we need
>> to
>>>>>> think
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing before proceeding with the PRs of
>>> Gabor
>>>>>>>>>>>>>>>>> (inverse
>>>>>>>>>>>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on
>>>>>>>> DataStream).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the windowing, I think that the current
>>> model
>>>>>> does
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole
>>>>>>>> windowing
>>>>>>>>>>>>>>>>>>>>>>>> infrastructure
>>>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically have to be redone. Meaning also that
>>> any
>>>>>>>> work
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do
>> now
>>>>>>>> becomes
>>>>>>>>>>>>>>>>>>>> useless.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the API, I proposed to restructure the
>>>>>>>> interactions
>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> different *DataStream classes and
>>>>>> grouping/windowing.
>>>>>>>>>>>>>>>>> (See
>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>> section
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very
>>> good
>>>>>>>>>>>>>>>>>>> initiative.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I added some comments to the respective
>>> sections
>>>>>>>>>>>>>>>>> (where I
>>>>>>>>>>>>>>>>>>>> didnt
>>>>>>>>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At some point I think it would be good to
>> have
>>> a
>>>>>>>> public
>>>>>>>>>>>>>>>>>>>> hangout
>>>>>>>>>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this, which could make a more dynamic
>>> discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek <aljoscha@apache.org> ezt
>>> írta
>>>>>>>>>>>>>>>>> (időpont:
>>>>>>>>>>>>>>>>>>>>>> 2015.
>>>>>>>>>>>>>>>>>>>>>>>> jún.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with people proposing changes to the
>> streaming
>>>>>> part
>>>>>>>> I
>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>> wanted
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> During the last few months, while I was
>>> getting
>>>>>>>>>>>>>>>>>>> acquainted
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming system, I wrote down some
>> thoughts I
>>>> had
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat
>>>>>>>> coherent
>>>>>>>>>>>>>>>>>>> shape
>>>>>>>>>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Out-of-order processing of elements in
>>> window
>>>>>>>>>>>>>>>>>>> operators
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - API design
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know what you think. Comment
>> in
>>>> the
>>>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a PR in the makings that would
>>> introduce
>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>>> timestamps
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also
>>>>>> hacked
>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing system that is able to process
>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform
>>>>>>>> efficient
>>>>>>>>>>>>>>>>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Mime
View raw message