flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@informatik.hu-berlin.de>
Subject Re: Rework of the window-join semantics
Date Wed, 08 Apr 2015 13:46:32 GMT
I started to work on an in-memory merge on a record-timestamp attribute
for total ordered streams. But I got distracted by the Storm
compatibility layer... I will continue to work on it, when I find some
extra time ;)

On 04/08/2015 03:18 PM, Márton Balassi wrote:
> +1 for Stephan's suggestion.
> 
> If we would like to support event time and also sorting inside a window we
> should carefully consider where to actually put the timestamp of the
> records. If the timestamp is part of the record then it is more
> straight-forward, but in case of we assign the timestamps in our sources
> the initial idea was to keep these hidden from the user and only use it in
> the network layer.
> 
> The more extreme solution with total ordering has a JIRA, but has been a
> bit silent lately. [1]
> 
> [1] https://issues.apache.org/jira/browse/FLINK-1493?
> 
> On Wed, Apr 8, 2015 at 3:06 PM, Stephan Ewen <sewen@apache.org> wrote:
> 
>> With the current network layer and the agenda we have for windowing, we
>> should be able to support widows on event time this in the near future.
>> Inside the window, you can sort all records by time and have a full
>> ordering. That is independent of the order of the stream.
>>
>> How about this as a first goal?
>>
>> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna <
>> cadonna@informatik.hu-berlin.de> wrote:
>>
> Hi Stephan,
> 
> how much of CEP depends on fully ordered streams depends on the
> operators that you use in the pattern query. But in general, they need
> fully ordered events within a window or at least some strategies to
> deal with out-of-order events.
> 
> If I got it right, you propose windows that are build on the
> occurrence time of the event, i.e., the time when the event occurred
> in the real world, and then to close a window when a punctuation says
> that the window is complete.
> 
> I agree with you that with such windows, you can do a lot and decrease
> overhead for maintainance. For example, some aggregations like sum,
> count, avg do not need ordered events in the window, they just need
> complete windows that are ordered with respect to each other to be
> deterministic. However, if the windows overlap you need again the time
> order to evict the oldest events from a window.
> 
> Cheers,
> Bruno
> 
> 
> On 08.04.2015 13:30, Stephan Ewen wrote:
>>>>> I agree, any ordering guarantees would need to be actively
>>>>> enabled.
>>>>>
>>>>> How much of CEP depends on fully ordered streams? There is a lot
>>>>> you can do with windows on event time, which are triggered by
>>>>> punctuations.
>>>>>
>>>>> This is like a "soft" variant of the ordered streams, where order
>>>>> relation occurs only with between windows, rather than between all
>>>>> events. That makes it much cheaper to maintain.
>>>>>
>>>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> This reasoning makes absolutely sense. That's why I suggested,
>>>>>> that the user should actively choose ordered data processing...
>>>>>>
>>>>>> About deadlocks: Those can be avoided, if the buffers are
>>>>>> consumed continuously in an in-memory merge buffer (maybe with
>>>>>> spilling to disk if necessary). Of course, latency might suffer
>>>>>> and punctuations should be used to relax this problem.
>>>>>>
>>>>>> As far as I understood, CEP might be an interesting use-case for
>>>>>> Flink, and it depend on ordered data streams.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
>>>>>>> Here is the state in Flink and why we have chosen not to do
>>>>>>> global
>>>>>> ordering
>>>>>>> at the moment:
>>>>>>>
>>>>>>> - Individual streams are FIFO, that means if the sender emits
>>>>>>> in order, the receiver receives in order.
>>>>>>>
>>>>>>> - When streams are merged (think shuffle / partition-by), then
>>>>>>> the
>>>>>> streams
>>>>>>> are not merged, but buffers from the streams are taken as the
>>>>>>> come in.
>>>>>>>
>>>>>>> - We had a version that merged streams (for sort merging in
>>>>>>> batch programs, actually) long ago, an it performed either
>>>>>>> horribly or deadlocked. The reason is that all streams are
>>>>>>> always stalled if a buffer is missing from one stream, since
>>>>>>> the merge cannot continue in such a
>>>>>> case
>>>>>>> (head-of-the-line waiting). That backpressures streams
>>>>>>> unnecessarily, slowing down computation. If the streams depend
>>>>>>> mutually on each other (think two partitioning steps), they
>>>>>>> frequently dadlock completely.
>>>>>>>
>>>>>>> - The only way to do that is by stalling/buffering/punctuating
>>>>>>> streams continuously, which is a lot of work to implement and
>>>>>>> will definitely
>>>>>> cost
>>>>>>> performance.
>>>>>>>
>>>>>>> Therefore we have decided going for a simpler model without
>>>>>>> global
>>>>>> ordering
>>>>>>> for now. If we start seeing that this has sever limitations in
>>>>>>> practice,
>>>>>> we
>>>>>>> may reconsider that.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
>>>>>>> cadonna@informatik.hu-berlin.de> wrote:
>>>>>>>
>>>>>>> Hi Paris,
>>>>>>>
>>>>>>> what's the reason for not guaranteeing global ordering across
>>>>>>> partitions in the stream model? Is it the smaller overhead or
>>>>>>> are there any operations not computable in a distributed
>>>>>>> environment with global ordering?
>>>>>>>
>>>>>>> In any case, I agree with Matthias that the user should choose.
>>>>>>> If operations were not computable with a global ordering, I
>>>>>>> would restrict the set of operations for that mode.
>>>>>>>
>>>>>>> Maybe, it would also be helpful to collect use cases for each
>>>>>>> of the modes proposed by Matthias to understand the
>>>>>>> requirements for both modes.
>>>>>>>
>>>>>>> Some (researchy) thoughts about indeterminism: How can the
>>>>>>> indeterminism of the current setting be quantified? How "large"
>>>>>>> can it grow with the current setting? Are there any limits that
>>>>>>> can be guaranteed?
>>>>>>>
>>>>>>> Cheers, Bruno
>>>>>>>
>>>>>>> On 07.04.2015 12:38, Paris Carbone wrote:
>>>>>>>>>> Hello Matthias,
>>>>>>>>>>
>>>>>>>>>> Sure, ordering guarantees are indeed a tricky thing,
I
>>>>>>>>>> recall having that discussion back in TU Berlin.
Bear in
>>>>>>>>>> mind thought that DataStream, our abstract data type,
>>>>>>>>>> represents a *partitioned* unbounded sequence of
events.
>>>>>>>>>> There are no *global* ordering guarantees made whatsoever
>>>>>>>>>> in that model across partitions. If you see it more
>>>>>>>>>> generally there are many “race conditions” in
a
>>>>>>>>>> distributed execution graph of vertices that process
>>>>>>>>>> multiple inputs asynchronously, especially when you
add
>>>>>>>>>> joins and iterations into the mix (how do you deal
with
>>>>>>>>>> reprocessing “old” tuples that iterate in the
graph). Btw
>>>>>>>>>> have you checked the Naiad paper [1]? Stephan cited
a
>>>>>>>>>> while ago and it is quite relevant to that discussion.
>>>>>>>>>>
>>>>>>>>>> Also, can you cite the paper with the joining semantics
>>>>>>>>>> you are referring to? That would be of good help
I
>>>>>>>>>> think.
>>>>>>>>>>
>>>>>>>>>> Paris
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>>>>>
>>>>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>>>>>>>> <mjsax@informatik.hu-berlin.de<mailto:
>>> mjsax@informatik.hu-berlin.de
>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi @all,
>>>>>>>>>>
>>>>>>>>>> please keep me in the loop for this work. I am highly
>>>>>>>>>> interested and I want to help on it.
>>>>>>>>>>
>>>>>>>>>> My initial thoughts are as follows:
>>>>>>>>>>
>>>>>>>>>> 1) Currently, system timestamps are used and the
>>>>>>>>>> suggested approach can be seen as state-of-the-art
(there
>>>>>>>>>> is actually a research paper using the exact same
join
>>>>>>>>>> semantic). Of course, the current approach is inherently
>>>>>>>>>> non-deterministic. The advantage is, that there is
no
>>>>>>>>>> overhead in keeping track of the order of records
and the
>>>>>>>>>> latency should be very low. (Additionally, state-recovery
>>>>>>>>>> is simplified. Because, the processing in inherently
>>>>>>>>>> non-deterministic, recovery can be done with relaxed
>>>>>>>>>> guarantees).
>>>>>>>>>>
>>>>>>>>>> 2) The user should be able to "switch on" deterministic
>>>>>>>>>> processing, ie, records are timestamped (either
>>>>>>>>>> externally when generated, or timestamped at the
>>>>>>>>>> sources). Because deterministic processing adds some
>>>>>>>>>> overhead, the user should decide for it actively.
In this
>>>>>>>>>> case, the order must be preserved in each re-distribution
>>>>>>>>>> step (merging is sufficient, if order is preserved
within
>>>>>>>>>> each incoming channel). Furthermore, deterministic
>>>>>>>>>> processing can be achieved by sound window semantics
(and
>>>>>>>>>> there is a bunch of them). Even for single-stream-windows
>>>>>>>>>> it's a tricky problem; for join-windows it's even
harder.
>>>>>>>>>> From my point of view, it is less important which
>>>>>>>>>> semantics are chosen; however, the user must be aware
how
>>>>>>>>>> it works. The most tricky part for deterministic
>>>>>>>>>> processing, is to deal with duplicate timestamps
(which
>>>>>>>>>> cannot be avoided). The timestamping for (intermediate)
>>>>>>>>>> result tuples, is also an important question to be
>>>>>>>>>> answered.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>>>>>>>
>>>>>>>>>> I agree with Kostas, if we define the exact semantics
how
>>>>>>>>>> this works, this is not more ad-hoc than any other
>>>>>>>>>> stateful operator with multiple inputs. (And I don't
>>>>>>>>>> think any other system support something similar)
>>>>>>>>>>
>>>>>>>>>> We need to make some design choices that are similar
to
>>>>>>>>>> the issues we had for windowing. We need to chose
how we
>>>>>>>>>> want to evaluate the windowing policies (global or
local)
>>>>>>>>>> because that affects what kind of policies can be
>>>>>>>>>> parallel, but I can work on these things.
>>>>>>>>>>
>>>>>>>>>> I think this is an amazing feature, so I wouldn't
>>>>>>>>>> necessarily rush the implementation for 0.9 though.
>>>>>>>>>>
>>>>>>>>>> And thanks for helping writing these down.
>>>>>>>>>>
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>>>>>>>> <ktzoumas@apache.org<mailto:ktzoumas@apache.org>>
wrote:
>>>>>>>>>>
>>>>>>>>>> Yes, we should write these semantics down. I volunteer
to
>>>>>>>>>> help.
>>>>>>>>>>
>>>>>>>>>> I don't think that this is very ad-hoc. The semantics
are
>>>>>>>>>> basically the following. Assuming an arriving element
>>>>>>>>>> from the left side: (1) We find the right-side matches
>>>>>>>>>> (2) We insert the left-side arrival into the left
window
>>>>>>>>>> (3) We recompute the left window We need to see whether
>>>>>>>>>> right window re-computation needs to be triggered
as
>>>>>>>>>> well. I think that this way of joining streams is
also
>>>>>>>>>> what the symmetric hash join algorithms were meant
to
>>>>>>>>>> support.
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>>>>>>>> <sewen@apache.org<mailto:sewen@apache.org>>
wrote:
>>>>>>>>>>
>>>>>>>>>> Is the approach of joining an element at a time from
one
>>>>>>>>>> input against a window on the other input not a bit
>>>>>>>>>> arbitrary?
>>>>>>>>>>
>>>>>>>>>> This just joins whatever currently happens to be
the
>>>>>>>>>> window by the time the single element arrives - that
is a
>>>>>>>>>> bit non-predictable, right?
>>>>>>>>>>
>>>>>>>>>> As a more general point: The whole semantics of windowing
>>>>>>>>>> and when they are triggered are a bit ad-hoc now.
It
>>>>>>>>>> would be really good to start formalizing that a
bit and
>>>>>>>>>> put it down somewhere. Users need to be able to clearly
>>>>>>>>>> understand and how to predict the output.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>>>>>>>> <gyula.fora@gmail.com<mailto:gyula.fora@gmail.com>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I think it should be possible to make this compatible
>>>>>>>>>> with the .window().every() calls. Maybe if there
is some
>>>>>>>>>> trigger set in "every" we would not join that stream
1 by
>>>>>>>>>> 1 but every so many elements. The problem here is
that
>>>>>>>>>> the window and every in this case are very-very different
>>>>>>>>>> than the normal windowing semantics. The window would
>>>>>>>>>> define the join window for each element of the other
>>>>>>>>>> stream while every would define how often I join
This
>>>>>>>>>> stream with the other one.
>>>>>>>>>>
>>>>>>>>>> We need to think to make this intuitive.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi
<
>>>>>>>>>> balassi.marton@gmail.com<mailto:balassi.marton@gmail.com>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> That would be really neat, the problem I see there,
that
>>>>>>>>>> we do not distinguish between dataStream.window()
and
>>>>>>>>>> dataStream.window().every() currently, they both
return
>>>>>>>>>> WindowedDataStreams and TriggerPolicies of the every
call
>>>>>>>>>> do not make much sense in this setting (in fact
>>>>>>>>>> practically the trigger is always set to count of
one).
>>>>>>>>>>
>>>>>>>>>> But of course we could make it in a way, that we
check
>>>>>>>>>> that the eviction should be either null or count
of 1, in
>>>>>>>>>> every other case we throw an exception while building
the
>>>>>>>>>> JobGraph.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek
<
>>>>>>>>>> aljoscha@apache.org<mailto:aljoscha@apache.org>>
wrote:
>>>>>>>>>>
>>>>>>>>>> Or you could define it like this:
>>>>>>>>>>
>>>>>>>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>>>>>>>
>>>>>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>>>>>
>>>>>>>>>> So a join would just be a join of two
>>>>>>>>>> WindowedDataStreamS. This would neatly move the windowing
>>>>>>>>>> stuff into one place.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>>>>>> balassi.marton@gmail.com<mailto:balassi.marton@gmail.com>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
>>>>>>>>>> bringing the windowing and window join API in sync.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>>>>>>>> <gyfora@apache.org<mailto:gyfora@apache.org>>
wrote:
>>>>>>>>>>
>>>>>>>>>> Hey guys,
>>>>>>>>>>
>>>>>>>>>> As Aljoscha has highlighted earlier the current window
>>>>>>>>>> join semantics in the streaming api doesn't follow
the
>>>>>>>>>> changes in the windowing api. More precisely, we
>>>>>>>>>> currently only support joins over time windows of
equal
>>>>>>>>>> size on both streams. The reason for this is that
we now
>>>>>>>>>> take a window of each of the two streams and do joins
>>>>>>>>>> over these pairs. This would be a blocking operation
if
>>>>>>>>>> the windows are not closed at exactly the same time
(and
>>>>>>>>>> since we dont want this we only allow time windows)
>>>>>>>>>>
>>>>>>>>>> I talked with Peter who came up with the initial
idea of
>>>>>>>>>> an alternative approach for stream joins which works
as
>>>>>>>>>> follows:
>>>>>>>>>>
>>>>>>>>>> Instead of pairing windows for joins, we do element
>>>>>>>>>> against window joins. What this means is that whenever
we
>>>>>>>>>> receive an element from one of the streams, we join
this
>>>>>>>>>> element with the current window(this window is constantly
>>>>>>>>>> updated) of the other stream. This is non-blocking
on any
>>>>>>>>>> window definitions as we dont have to wait for windows
to
>>>>>>>>>> be completed and we can use this with any of our
>>>>>>>>>> predefined policies like Time.of(...), Count.of(...),
>>>>>>>>>> Delta.of(....).
>>>>>>>>>>
>>>>>>>>>> Additionally this also allows some very flexible
way of
>>>>>>>>>> defining window joins. With this we could also define
>>>>>>>>>> grouped windowing inside if a join. An example of
this
>>>>>>>>>> would be: Join all elements of Stream1 with the last
5
>>>>>>>>>> elements by a given windowkey of Stream2 on some
join
>>>>>>>>>> key.
>>>>>>>>>>
>>>>>>>>>> This feature can be easily implemented over the current
>>>>>>>>>> operators, so I already have a working prototype
for the
>>>>>>>>>> simple non-grouped case. My only concern is the API,
the
>>>>>>>>>> best thing I could come up with is something like
this:
>>>>>>>>>>
>>>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>>>>>> windowDefB).by(windowKey1,
>>>>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>>>>
>>>>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>>>>
>>>>>>>>>> I think this new approach would be worthy of our
>>>>>>>>>> "flexible windowing" in contrast with the current
>>>>>>>>>> approach.
>>>>>>>>>>
>>>>>>>>>> Regards, Gyula
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
> 
>>>
>>
> 


Mime
View raw message