flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Rework of the window-join semantics
Date Fri, 24 Apr 2015 14:07:14 GMT
There is a simple reason for that: They don't support joins. :D

They support n-ary co-group, however. This is implemented using
tagging and a group-by-key operation. So only elements in the same
window can end up in the same co-grouped result.

On Fri, Apr 24, 2015 at 3:51 PM, Matthias J. Sax
<mjsax@informatik.hu-berlin.de> wrote:
> Interesting read. Thanks for the pointer.
>
> Take home message (in my understanding):
>   - they support wall-clock, attribute-ts, and count windows
>   -> default is attribute-ts (and not wall-clock as in Flink)
>   -> it is not specified, if a global order is applied to windows, but I
> doubt it, because of their Watermark approach
>   - they allow the user to assign timestamps for attribute-ts windows
>   - they deal with out-of-order data (-> not sure what the last sentence
> means exactly: "...causing the late elements to be emitted as they
> arrive." ?)
>   - their "Watermark" approach might yield high latencies
>
> However, they don't talk about joins... :(
>
>
>
>
> On 04/24/2015 02:25 PM, Aljoscha Krettek wrote:
>> Did anyone read these:
>> https://cloud.google.com/dataflow/model/windowing,
>> https://cloud.google.com/dataflow/model/triggers ?
>>
>> The semantics seem very straightforward and I'm sure the google guys
>> spent some time thinking this through. :D
>>
>> On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <sewen@apache.org> wrote:
>>> Perfect! I am eager to see what you came up with!
>>>
>>> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> We have spent some time with Asterios, Paris and Jonas to finalize the
>>>> windowing semantics (both the current features and the window join), and
I
>>>> think we made very have come up with a very clear picture.
>>>>
>>>> We will write down the proposed semantics and publish it to the wiki next
>>>> week.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos <
>>>> asterios.katsifodimos@tu-berlin.de> wrote:
>>>>
>>>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
>>>>> Streams does: symmetric hash join.
>>>>>
>>>>> From [1]:
>>>>> "When a tuple is received on an input port, it is inserted into the
>>>> window
>>>>> corresponding to the input port, which causes the window to trigger.
As
>>>>> part of the trigger processing, the tuple is compared against all tuples
>>>>> inside the window of the opposing input port. If the tuples match, then
>>>> an
>>>>> output tuple will be produced for each match. If at least one output
was
>>>>> generated, a window punctuation will be generated after all the outputs."
>>>>>
>>>>> Cheers,
>>>>> Asterios
>>>>>
>>>>> [1]
>>>>>
>>>>>
>>>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax <
>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> Hi Paris,
>>>>>>
>>>>>> thanks for the pointer to the Naiad paper. That is quite interesting.
>>>>>>
>>>>>> The paper I mentioned [1], does not describe the semantics in detail;
>>>> it
>>>>>> is more about the implementation for the stream-joins. However, it
uses
>>>>>> the same semantics (from my understanding) as proposed by Gyula.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
>>>>>> Streams". VLDB 2002.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote:
>>>>>>> Hello Matthias,
>>>>>>>
>>>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall
having
>>>>>> that discussion back in TU Berlin. Bear in mind thought that
>>>> DataStream,
>>>>>> our abstract data type, represents a *partitioned* unbounded sequence
>>>> of
>>>>>> events. There are no *global* ordering guarantees made whatsoever
in
>>>> that
>>>>>> model across partitions. If you see it more generally there are many
>>>>> “race
>>>>>> conditions” in a distributed execution graph of vertices that process
>>>>>> multiple inputs asynchronously, especially when you add joins and
>>>>>> iterations into the mix (how do you deal with reprocessing “old”
tuples
>>>>>> that iterate in the graph). Btw have you checked the Naiad paper
[1]?
>>>>>> Stephan cited a while ago and it is quite relevant to that discussion.
>>>>>>>
>>>>>>> Also, can you cite the paper with the joining semantics you are
>>>>>> referring to? That would be of good help I think.
>>>>>>>
>>>>>>> Paris
>>>>>>>
>>>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>>
>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>>
>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax <
>>>>> mjsax@informatik.hu-berlin.de
>>>>>> <mailto:mjsax@informatik.hu-berlin.de>> wrote:
>>>>>>>
>>>>>>> Hi @all,
>>>>>>>
>>>>>>> please keep me in the loop for this work. I am highly interested
and
>>>> I
>>>>>>> want to help on it.
>>>>>>>
>>>>>>> My initial thoughts are as follows:
>>>>>>>
>>>>>>> 1) Currently, system timestamps are used and the suggested approach
>>>> can
>>>>>>> be seen as state-of-the-art (there is actually a research paper
using
>>>>>>> the exact same join semantic). Of course, the current approach
is
>>>>>>> inherently non-deterministic. The advantage is, that there is
no
>>>>>>> overhead in keeping track of the order of records and the latency
>>>>> should
>>>>>>> be very low. (Additionally, state-recovery is simplified. Because,
>>>> the
>>>>>>> processing in inherently non-deterministic, recovery can be done
with
>>>>>>> relaxed guarantees).
>>>>>>>
>>>>>>>  2) The user should be able to "switch on" deterministic processing,
>>>>>>> ie, records are timestamped (either externally when generated,
or
>>>>>>> timestamped at the sources). Because deterministic processing
adds
>>>> some
>>>>>>> overhead, the user should decide for it actively.
>>>>>>> In this case, the order must be preserved in each re-distribution
>>>> step
>>>>>>> (merging is sufficient, if order is preserved within each incoming
>>>>>>> channel). Furthermore, deterministic processing can be achieved
by
>>>>> sound
>>>>>>> window semantics (and there is a bunch of them). Even for
>>>>>>> single-stream-windows it's a tricky problem; for join-windows
it's
>>>> even
>>>>>>> harder. From my point of view, it is less important which semantics
>>>> are
>>>>>>> chosen; however, the user must be aware how it works. The most
tricky
>>>>>>> part for deterministic processing, is to deal with duplicate
>>>> timestamps
>>>>>>> (which cannot be avoided). The timestamping for (intermediate)
result
>>>>>>> tuples, is also an important question to be answered.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote:
>>>>>>> Hey,
>>>>>>>
>>>>>>> I agree with Kostas, if we define the exact semantics how this
works,
>>>>>> this
>>>>>>> is not more ad-hoc than any other stateful operator with multiple
>>>>> inputs.
>>>>>>> (And I don't think any other system support something similar)
>>>>>>>
>>>>>>> We need to make some design choices that are similar to the issues
we
>>>>> had
>>>>>>> for windowing. We need to chose how we want to evaluate the windowing
>>>>>>> policies (global or local) because that affects what kind of
policies
>>>>> can
>>>>>>> be parallel, but I can work on these things.
>>>>>>>
>>>>>>> I think this is an amazing feature, so I wouldn't necessarily
rush
>>>> the
>>>>>>> implementation for 0.9 though.
>>>>>>>
>>>>>>> And thanks for helping writing these down.
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzoumas@apache.org
>>>>>> <mailto:ktzoumas@apache.org>> wrote:
>>>>>>>
>>>>>>> Yes, we should write these semantics down. I volunteer to help.
>>>>>>>
>>>>>>> I don't think that this is very ad-hoc. The semantics are basically
>>>> the
>>>>>>> following. Assuming an arriving element from the left side:
>>>>>>> (1) We find the right-side matches
>>>>>>> (2) We insert the left-side arrival into the left window
>>>>>>> (3) We recompute the left window
>>>>>>> We need to see whether right window re-computation needs to be
>>>>> triggered
>>>>>> as
>>>>>>> well. I think that this way of joining streams is also what the
>>>>> symmetric
>>>>>>> hash join algorithms were meant to support.
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <sewen@apache.org
>>>>> <mailto:
>>>>>> sewen@apache.org>> wrote:
>>>>>>>
>>>>>>> Is the approach of joining an element at a time from one input
>>>> against
>>>>> a
>>>>>>> window on the other input not a bit arbitrary?
>>>>>>>
>>>>>>> This just joins whatever currently happens to be the window by
the
>>>> time
>>>>>>> the
>>>>>>> single element arrives - that is a bit non-predictable, right?
>>>>>>>
>>>>>>> As a more general point: The whole semantics of windowing and
when
>>>> they
>>>>>>> are
>>>>>>> triggered are a bit ad-hoc now. It would be really good to start
>>>>>>> formalizing that a bit and
>>>>>>> put it down somewhere. Users need to be able to clearly understand
>>>> and
>>>>>>> how
>>>>>>> to predict the output.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.fora@gmail.com
>>>>>> <mailto:gyula.fora@gmail.com>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I think it should be possible to make this compatible with the
>>>>>>> .window().every() calls. Maybe if there is some trigger set in
>>>> "every"
>>>>>>> we
>>>>>>> would not join that stream 1 by 1 but every so many elements.
The
>>>>>>> problem
>>>>>>> here is that the window and every in this case are very-very
>>>> different
>>>>>>> than
>>>>>>> the normal windowing semantics. The window would define the join
>>>> window
>>>>>>> for
>>>>>>> each element of the other stream while every would define how
often I
>>>>>>> join
>>>>>>> This stream with the other one.
>>>>>>>
>>>>>>> We need to think to make this intuitive.
>>>>>>>
>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>>>>> balassi.marton@gmail.com<mailto:balassi.marton@gmail.com>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> That would be really neat, the problem I see there, that we do
not
>>>>>>> distinguish between dataStream.window() and
>>>>>>> dataStream.window().every()
>>>>>>> currently, they both return WindowedDataStreams and TriggerPolicies
>>>>>>> of
>>>>>>> the
>>>>>>> every call do not make much sense in this setting (in fact
>>>>>>> practically
>>>>>>> the
>>>>>>> trigger is always set to count of one).
>>>>>>>
>>>>>>> But of course we could make it in a way, that we check that the
>>>>>>> eviction
>>>>>>> should be either null or count of 1, in every other case we throw
an
>>>>>>> exception while building the JobGraph.
>>>>>>>
>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>>>>> aljoscha@apache.org<mailto:aljoscha@apache.org>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Or you could define it like this:
>>>>>>>
>>>>>>> stream_A = a.window(...)
>>>>>>> stream_B = b.window(...)
>>>>>>>
>>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>>
>>>>>>> So a join would just be a join of two WindowedDataStreamS. This
>>>>>>> would
>>>>>>> neatly move the windowing stuff into one place.
>>>>>>>
>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>>> balassi.marton@gmail.com<mailto:balassi.marton@gmail.com>
>>>>>>>
>>>>>>> wrote:
>>>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for
>>>>>>> bringing
>>>>>>> the
>>>>>>> windowing and window join API in sync.
>>>>>>>
>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyfora@apache.org
>>>> <mailto:
>>>>>> gyfora@apache.org>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hey guys,
>>>>>>>
>>>>>>> As Aljoscha has highlighted earlier the current window join
>>>>>>> semantics
>>>>>>> in
>>>>>>> the streaming api doesn't follow the changes in the windowing
>>>>>>> api.
>>>>>>> More
>>>>>>> precisely, we currently only support joins over time windows
of
>>>>>>> equal
>>>>>>> size
>>>>>>> on both streams. The reason for this is that we now take a
>>>>>>> window
>>>>>>> of
>>>>>>> each
>>>>>>> of the two streams and do joins over these pairs. This would
be
>>>>>>> a
>>>>>>> blocking
>>>>>>> operation if the windows are not closed at exactly the same time
>>>>>>> (and
>>>>>>> since
>>>>>>> we dont want this we only allow time windows)
>>>>>>>
>>>>>>> I talked with Peter who came up with the initial idea of an
>>>>>>> alternative
>>>>>>> approach for stream joins which works as follows:
>>>>>>>
>>>>>>> Instead of pairing windows for joins, we do element against
>>>>>>> window
>>>>>>> joins.
>>>>>>> What this means is that whenever we receive an element from one
>>>>>>> of
>>>>>>> the
>>>>>>> streams, we join this element with the current window(this
>>>>>>> window
>>>>>>> is
>>>>>>> constantly updated) of the other stream. This is non-blocking
on
>>>>>>> any
>>>>>>> window
>>>>>>> definitions as we dont have to wait for windows to be completed
>>>>>>> and
>>>>>>> we
>>>>>>> can
>>>>>>> use this with any of our predefined policies like Time.of(...),
>>>>>>> Count.of(...), Delta.of(....).
>>>>>>>
>>>>>>> Additionally this also allows some very flexible way of defining
>>>>>>> window
>>>>>>> joins. With this we could also define grouped windowing inside
>>>>>>> if
>>>>>>> a
>>>>>>> join.
>>>>>>> An example of this would be: Join all elements of Stream1 with
>>>>>>> the
>>>>>>> last
>>>>>>> 5
>>>>>>> elements by a given windowkey of Stream2 on some join key.
>>>>>>>
>>>>>>> This feature can be easily implemented over the current
>>>>>>> operators,
>>>>>>> so
>>>>>>> I
>>>>>>> already have a working prototype for the simple non-grouped
>>>>>>> case.
>>>>>>> My
>>>>>>> only
>>>>>>> concern is the API, the best thing I could come up with is
>>>>>>> something
>>>>>>> like
>>>>>>> this:
>>>>>>>
>>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>>> windowDefB).by(windowKey1,
>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>
>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>
>>>>>>> I think this new approach would be worthy of our "flexible
>>>>>>> windowing"
>>>>>>> in
>>>>>>> contrast with the current approach.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gyula
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message