flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elias Levy <fearsome.lucid...@gmail.com>
Subject Re: How to perform this join operation?
Date Thu, 14 Apr 2016 15:12:29 GMT
Anyone from Data Artisans have some idea of how to go about this?

On Wed, Apr 13, 2016 at 5:32 PM, Maxim <mfateev@gmail.com> wrote:

> You could simulate the Samza approach by having a RichFlatMapFunction over
> cogrouped streams that maintains the sliding window in its ListState. As I
> understand the drawback is that the list state is not maintained in the
> managed memory.
> I'm interested to hear about the right way to implement this.
> On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy <fearsome.lucidity@gmail.com>
> wrote:
>> I am wondering how you would implement the following function in Flink.
>> The function takes as input two streams.  One stream can be viewed a a
>> tuple with two value *(x, y)*, the second stream is a stream of
>> individual values *z*.  The function keeps a time based window on the
>> first input (say, 24 hours).  Whenever it receives an element from the
>> second stream, it compares the value *z* against the *x* element of each
>> tuple in the window, and for each match it emits *(x, y)*.  You are
>> basically doing a join on *x=z*.  Note that values from the second
>> stream are not windowed and they are only matched to values from the first
>> stream with an earlier timestamps.
>> This was relatively easy to implement in Samza.  Consume off two topics,
>> the first keyed by *x* and the second by *z*.  Consume both topics in a
>> job.  Messages with the same key would be consumed by the same task.  The
>> task could maintain a window of messages from the first stream in its local
>> state,  Whenever a message came in via the second stream, it could look up
>> in the local state for matching messages, and if it found any, send them to
>> the output stream.  Obviously, with Samza you don't have the luxury of the
>> system handling event time for you, but this work well and it is easy to
>> implement.
>> I am not clear how this would be implemented in Flink.
>> It is easy enough to partition by key either stream, and to window the
>> first stream using a sliding window, but from there out things get
>> complicated.
>> You can join the two streams by key, but you must do so using the same
>> window for both streams.  That means events from the first stream may be
>> matched to older events of the second stream, which is not what we want.  I
>> suppose if both included a timestamp, you could later add a filter to
>> remove such events from the merged stream.  But you would also have to deal
>> with duplicates, as the window is a sliding window and the same two
>> elements may match across all window panes that contain the matching
>> elements.  So you need to dedup as well.
>> coGroup seems like it would suffer from the same issues.
>> Maybe the answer is connected streams, but there is scant documentation
>> on the semantics of ConnectedStreams.  There isn't even an example that I
>> could find that makes use of them.
>> Thoughts?

View raw message