flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Unexpected behavior from interval join in Flink
Date Mon, 24 Jun 2019 09:46:58 GMT
Ah, that's great!
Thanks for letting us know :-)

Am Mo., 24. Juni 2019 um 11:33 Uhr schrieb Wouter Zorgdrager <
W.D.Zorgdrager@tudelft.nl>:

> Hi Fabian,
>
> Thanks for your reply. I managed to resolve this issue. Actually this
> behavior was not so unexpected, I messed up using xStream as a 'base' while
> I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <=
> xStream.element <= yStream.element + 30 min. Interchanging both datastreams
> fixed this issue.
>
> Thanks anyways.
>
> Cheers, Wouter
>
>
>
> Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Wouter,
>>
>> Not sure what is going wrong there, but something that you could try is
>> to use a custom watemark assigner and always return a watermark of 0.
>> When the source finished serving the watermarks, it emits a final
>> Long.MAX_VALUE watermark.
>> Hence the join should consume all events and store them in state. When
>> both sources are finished, it would start to join the data and clean up the
>> state.
>> This test would show if there are any issue with late data.
>>
>> Best, Fabian
>>
>> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
>> W.D.Zorgdrager@tudelft.nl>:
>>
>>> Anyone some leads on this issue? Have been looking into the
>>> IntervalJoinOperator code, but that didn't really help. My intuition is
>>> that it is rejected because of lateness, however that still confuses me
>>> since I'm sure that both datastreams have monotonic increasing timestamps.
>>>
>>> Thx, Wouter
>>>
>>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
>>> W.D.Zorgdrager@tudelft.nl>:
>>>
>>>> Hi all,
>>>>
>>>> I'm experiencing some unexpected behavior using an interval join in
>>>> Flink.
>>>> I'm dealing with two data sets, lets call them X and Y. They are finite
>>>> (10k elements) but I interpret them as a DataStream. The data needs to be
>>>> joined for enrichment purposes. I use event time and I know (because I
>>>> generated the data myself) that the timestamp of an element Y is always
>>>> between -60 minutes and +30 minutes of the element with the same key in set
>>>> X. Both datasets are in-order (in terms of timestamps), equal in size,
>>>> share a common key and parallelism is set to 1 throughout the whole program.
>>>>
>>>> The code to join looks something like this:
>>>>
>>>> xStream
>>>>       .assignAscendingTimestamps(_.date.getTime)
>>>>       .keyBy(_.commonKey)
>>>>       .intervalJoin(
>>>>         yStream
>>>>           .assignAscendingTimestamps(_.date.getTime)
>>>>           .keyBy(_.commonKey))
>>>>       .between(Time.minutes(-60), Time.minutes(30))
>>>>       .process(new ProcessJoinFunction[X, Y, String] {
>>>>         override def processElement(
>>>>             left: X,
>>>>             right: Y,
>>>>             ctx: ProcessJoinFunction[X, Y, String]#Context,
>>>>             out: Collector[String]): Unit = {
>>>>
>>>>           out.collect(left + ":" + right)
>>>>         }
>>>>
>>>>
>>>> However, about 30% percent of the data is not joined. Is there a proper
>>>> way to debug this? For instance, in windows you can side-output late data.
>>>> Is there a possibility to side-output unjoinable data?
>>>>
>>>> Thx a lot,
>>>> Wouter
>>>>
>>>>
>>>>

Mime
View raw message