flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Martijn Visser (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-23749) Testing Window Join
Date Tue, 14 Sep 2021 09:09:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-23749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414826#comment-17414826
] 

Martijn Visser commented on FLINK-23749:
----------------------------------------

I've performed multiple tests successfully. I used the following statements and successfully
validated the output:

{code:sql}
CREATE TABLE LeftTable (
    row_time   TIMESTAMP(3),
    num INT,
    id STRING,
    WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'faker',
  'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.num.expression' = '#{regexify ''(1|2|3)''}',
  'fields.id.expression' = '#{regexify ''(L1|L2|L3)''}',
  'rows-per-second' = '1'
);

CREATE TABLE RightTable (
    row_time   TIMESTAMP(3),
    num INT,
    id STRING,
    WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (
  'connector' = 'faker',
  'fields.row_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.num.expression' = '#{regexify ''(1|2|3)''}',
  'fields.id.expression' = '#{regexify ''(R1|R2|R3)''}',
  'rows-per-second' = '1'
);
{code}

Full outer join (also tested inner, left and right join)
{code:sql}
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, L.window_start, L.window_end
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL
'5' SECONDS))
           ) L
           FULL JOIN (
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL
'5' SECONDS))
           ) R
           ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
{code}

Also tried a semi window join and the anti window join, but in 1.14 RC0 they both suffer from
https://issues.apache.org/jira/browse/FLINK-24186. 

{code:sql}
SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL
'5' SECONDS))
           ) L WHERE L.num IN (
             SELECT num FROM (   
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL
'5' SECONDS))
             ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
{code}

{noformat}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime field: [row_time, window_time]
in the query when insert into 'default_catalog.default_database.Unregistered_Collect_Sink_35'.
Please select the rowtime field that should be used as event-time timestamp for the DataStream
by casting all other fields to TIMESTAMP.
{noformat}

I've pinged [~twalthr] and he'll also merge this fix to the release-1.14 branch. So this right
now throws an error, but that should be resolved in the next RC and final release.

Last but not least, I also tried out this one and works to:

{code:sql}
CREATE TABLE orders ( 
    item_id STRING,
    pay_time TIMESTAMP(3),
    price DOUBLE, 
    item STRING,
    user_id STRING,
    seller_id STRING,
    WATERMARK FOR pay_time AS pay_time
) WITH (
  'connector' = 'faker',
  'fields.item_id.expression' = '#{regexify ''(Apples|Pears|Bananas|Oranges|Strawberries)''}',
  'fields.pay_time.expression' = '#{date.past ''1'',''SECONDS''}',
  'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
  'fields.item.expression' = '#{Commerce.productName}',
  'fields.user_id.expression' = '#{Address.firstName}',
  'fields.seller_id.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
  'rows-per-second' = '100'
);

SELECT item_id, a.seller_id, a.window_start, a.window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT
user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end
) a
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current
minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' SECONDS, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;
{code}

Conclusion: test was successful and we should be good to go after https://issues.apache.org/jira/browse/FLINK-24186
is also merged into release-1.14.

> Testing Window Join
> -------------------
>
>                 Key: FLINK-23749
>                 URL: https://issues.apache.org/jira/browse/FLINK-23749
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>            Reporter: JING ZHANG
>            Assignee: Martijn Visser
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.14.0
>
>
> The window join requires the join on condition contains window starts equality of input
tables and window ends equality of input tables. The semantic of window join is the same to
the [DataStream window join|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/joining.html#window-join].
> {code:java}
> SELECT ...
> FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
> ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
> {code}
> In the future, we can also simplify the join on clause to only include the window start
equality if the windowing TVF is {{TUMBLE}} or {{HOP}} . Currently, the windowing TVFs
must be the same of left and right inputs. This can be extended in the future, for example,
tumbling windows join sliding windows with the same window size.
> Currently, Flink not only supports Window Join which follows after [Window Aggregation|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/]. 
But also supports Window Join which follows after [Windowing TVF|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message