flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: time-windowed joins and tumbling windows
Date Fri, 13 Mar 2020 13:41:59 GMT
Hi Vinod,

I cannot spot any problems in your SQL query.

Some questions for clarification:
1) Which planner are you using?
2) How do you create your watermarks?
3) Did you unit test with only parallelism of 1 or higher?
4) Can you share the output of TableEnvironment.explain() with us?

Shouldn't c have a rowtime constraint around o instead of r? Such that 
all time-based operations work on o.rowtime?

Regards,
Timo


On 10.03.20 19:26, Vinod Mehra wrote:
> Hi!
> 
> We are testing the following 3 way time windowed join to keep the 
> retained state size small. Using joins for the first time here. It works 
> in unit tests but we are not able to get expected results in production. 
> We are still troubleshooting this issue. Can you please help us review 
> this in case we missed something or our assumptions are wrong?
> 
> SELECT o.region_code,
>         concat_ws(
>           '/',
>           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL THEN 1 ELSE
0 END)AS VARCHAR),
>           CAST(count(1)AS VARCHAR)
>         )AS offer_conversion_5m
>    FROM (
>          SELECT region_code,
>                 offer_id,
>                 rowtime
>            FROM event_offer_created
>           WHERE ...
> ) o
>     LEFT JOIN (
>          SELECT offer_id,
>                 order_id,
>                 rowtime
>            FROM event_order_requested
>           WHERE ...
> ) r
>       ON o.offer_id = r.offer_id
>       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
> LEFT JOIN (
>          SELECT order_id,
>                 rowtime
>            FROM event_order_cancelled
>           WHERE ...
> )c
> ON r.order_id =c.order_id
>       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
> GROUP BY
> o.region_code,
>         TUMBLE(o.rowtime,INTERVAL '5' minute)
> 
> 
> The sequence of events is:
> 
>  1. At time X an offer is created (event stream = "*event_offer_created"*)
>  2. At time Y that offer is used to create an order (event stream =
>     "*event_order_requested*"). Left join because not all offers get used.
>  3. At time Z that order is cancelled (event stream =
>     "*event_order_cancelled*"). Left join because not all orders get
>     cancelled.
> 
> "*offer_conversion_5m*" represents: number of converted orders / total 
> number of offerings" in a 5 minutes bucket. If an order gets cancelled 
> we don't want to count that. That's why we have [c.order_id IS NULL THEN 
> 1 ELSE 0 END] in the select.
> 
> We picked 1 hour time windows because that's the maximum time we expect 
> the successive events to take for a given record chain.
> 
> The outer GROUP BY is to get 5 minute aggregation for each "region". As 
> expected the watermark lags 2 hour from the current time because of the 
> two time-window joins above. The IdleStateRetentionTime is not set, so 
> the expectation is that the state will be retained as per the time 
> window size and as the records fall off the window the state will be 
> cleaned up. The aggregated state is expected to be kept around for 5 
> minutes (GROUP BY).
> 
> However, we are unable to see the conversion (offer_created -> 
> order_requested (without order_cancelled)). '*offer_conversion_5m*' is 
> always zero although we know the streams contain records that should 
> have incremented the count. Any idea what could be wrong? Is the state 
> being dropped too early (5 mins) because of the outer 5 minute tumbling 
> window?
> 
> Thanks,
> Vinod


Mime
View raw message