flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry Cai <h...@pinterest.com>
Subject Re: streaming join implementation
Date Thu, 14 Apr 2016 17:25:41 GMT
Cogroup is nice, thanks.

But if I define a tumbling window of one day, does that mean flink needs to
cache all the data for one day in memory?  I have about 5TB of data coming
for one day.  About 50% records will find a matching records (the other 50%
doesn't).


On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> right now, Flink does not give you a way to get the the records that where
> not joined for a join. You can, however use a co-group operation instead of
> a join to figure out which records did not join with records from the other
> side and treat them separately.
>
> Let me show an example:
>
> val input1: DataStream[A] = ...
> val input2: DataStream[B] = ...
>
> val result = input1.coGroup(input2)
>   .where(_.key1)
>   .equalTo(_.key2)
>   .window(TumblingTimeWindows.of(Time.days(1)))
>   .apply(new MyCoGroupFunction)
>
> class MyCoGroupFunction {
>   void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
>     if (!first.iterator().hasNext()) {
>       // no element from first input matched
>       out.collect(<message telling that I only have second input elements>)
>     } else if (!second.iterator().hasNext()) {
>             out.collect(<message telling that I only have first input
> elements>)
>     } else {
>        // perform the actual join using the two iterables
>     }
>   }
> }
>
> The result will be a stream that contains both join results as well as the
> elements telling you that something didn't join. You can process this
> stream further by splitting it into different streams of only proper join
> results and non-joined elements and so on.
>
> I hope this helps somewhat.
>
> Cheers,
> Aljoscha
> On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> Let me give you specific example, say stream1 event1 happened within your
>> window 0-5 min with key1, and event2 on stream2 with key2 which could have
>> matched with key1 happened at 5:01 outside the join window, so now you will
>> have to co-relate the event2 on stream2 with the event1 with stream1 which
>> has happened on the previous window, this was the corner case I mentioned
>> before. I am not aware if flink can solve this problem for you, that would
>> be nice, instead of solving this in application.
>>
>> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hcai@pinterest.com> wrote:
>>
>>> Thanks Balaji.  Do you mean you spill the non-matching records after 5
>>> minutes into redis?  Does flink give you control on which records is not
>>> matching in the current window such that you can copy into a long-term
>>> storage?
>>>
>>>
>>>
>>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> You can implement join in flink (which is a inner join) the below
>>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>>>> be some corners cases when the data coming after 5 minutes will be  missed
>>>> out in the join window, I actually had solved this problem but storing some
>>>> data in redis and wrote correlation logic to take care of the corner cases
>>>> that were missed out in the join  window.
>>>>
>>>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new
SomeJoinFunction)
>>>>
>>>>
>>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hcai@pinterest.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are evaluating different streaming platforms.  For a typical join
>>>>> between two streams
>>>>>
>>>>> select a.*, b.*
>>>>> FROM a, b
>>>>> ON a.id == b.id
>>>>>
>>>>> How does flink implement the join?  The matching record from either
>>>>> stream can come late, we consider it's a valid join as long as the event
>>>>> time for record a and b are in the same day.
>>>>>
>>>>> I think some streaming platform (e.g. google data flow) will store the
>>>>> records from both streams in a K/V lookup store and later do the lookup.
>>>>> Is this how flink implement the streaming join?
>>>>>
>>>>> If we need to store all the records in a state store, that's going to
>>>>> be a lots of records for a day.
>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message