flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: How to perform this join operation?
Date Wed, 20 Apr 2016 17:13:04 GMT
Hi Elias,

sorry for the late reply. You're right that with the windowed join you
would have to deal with pairs where the timestamp of (x,y) is not
necessarily earlier than the timestamp of z. Moreover, by using sliding
windows you would receive duplicates as you've described. Using tumbling
windows would mean that you lose join matches if (x,y) lives in an earlier
window. Thus, in order to solve your problem you would have to write a
custom stream operator.

The stream operator would do the following: Collecting the inputs from
(x,y) and z which are already keyed. Thus, we know that x=z holds true.
Using a priority queue we order the elements because we don't know how the
arrive at the operator. Whenever we receive a watermark indicating that no
earlier events can arrive anymore, we can go through the two priority
queues to join the elements. The queues are part of the operators state so
that we don't lose information in case of a recovery.

I've sketched such an operator here [1]. I hope this helps you to get

[1] https://github.com/tillrohrmann/custom-join


On Thu, Apr 14, 2016 at 5:12 PM, Elias Levy <fearsome.lucidity@gmail.com>

> Anyone from Data Artisans have some idea of how to go about this?
> On Wed, Apr 13, 2016 at 5:32 PM, Maxim <mfateev@gmail.com> wrote:
>> You could simulate the Samza approach by having a RichFlatMapFunction
>> over cogrouped streams that maintains the sliding window in its ListState.
>> As I understand the drawback is that the list state is not maintained in
>> the managed memory.
>> I'm interested to hear about the right way to implement this.
>> On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy <fearsome.lucidity@gmail.com>
>> wrote:
>>> I am wondering how you would implement the following function in Flink.
>>> The function takes as input two streams.  One stream can be viewed a a
>>> tuple with two value *(x, y)*, the second stream is a stream of
>>> individual values *z*.  The function keeps a time based window on the
>>> first input (say, 24 hours).  Whenever it receives an element from the
>>> second stream, it compares the value *z* against the *x* element of
>>> each tuple in the window, and for each match it emits *(x, y)*.  You
>>> are basically doing a join on *x=z*.  Note that values from the second
>>> stream are not windowed and they are only matched to values from the first
>>> stream with an earlier timestamps.
>>> This was relatively easy to implement in Samza.  Consume off two topics,
>>> the first keyed by *x* and the second by *z*.  Consume both topics in a
>>> job.  Messages with the same key would be consumed by the same task.  The
>>> task could maintain a window of messages from the first stream in its local
>>> state,  Whenever a message came in via the second stream, it could look up
>>> in the local state for matching messages, and if it found any, send them to
>>> the output stream.  Obviously, with Samza you don't have the luxury of the
>>> system handling event time for you, but this work well and it is easy to
>>> implement.
>>> I am not clear how this would be implemented in Flink.
>>> It is easy enough to partition by key either stream, and to window the
>>> first stream using a sliding window, but from there out things get
>>> complicated.
>>> You can join the two streams by key, but you must do so using the same
>>> window for both streams.  That means events from the first stream may be
>>> matched to older events of the second stream, which is not what we want.  I
>>> suppose if both included a timestamp, you could later add a filter to
>>> remove such events from the merged stream.  But you would also have to deal
>>> with duplicates, as the window is a sliding window and the same two
>>> elements may match across all window panes that contain the matching
>>> elements.  So you need to dedup as well.
>>> coGroup seems like it would suffer from the same issues.
>>> Maybe the answer is connected streams, but there is scant documentation
>>> on the semantics of ConnectedStreams.  There isn't even an example that I
>>> could find that makes use of them.
>>> Thoughts?

View raw message