flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xingcan Cui (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
Date Wed, 19 Jul 2017 11:57:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092967#comment-16092967
] 

Xingcan Cui commented on FLINK-6233:
------------------------------------

Hi, [~Yuhong_kyo], thanks for the assignment. I'll devote myself to it.

Hi, [~fhueske], recently, I'm working on a time restricted theta-join for {{DataStream}} in
the API level. A demo can be found [here|https://goo.gl/9WJiqT] and a corresponding design
document is almost finished.

The motivation for this job is that, I think we need a "general backend" in the API level
to support different kinds of joins (e.g., based on rowtime, proctime) in the SQL/Table level,
or it will be hard to maintain the diverse (and dynamically generated?) codes in the future.

Currently, we have separated the (inner) join into different pieces, and only a partial of
the criteria can be supported now (that's why we need [FLINK-7217|https://issues.apache.org/jira/browse/FLINK-7217]:)).
I just wonder if we could first provide a general purpose theta-join API, which may not be
so efficient but can get rid of most restrictions on the join condition (e.g., equi-keys).
The API can either be public or be reserved for internal use. After that, we can further do
some optimizations for dedicated join conditions to make the execution more efficient. What
do you think?

As for the challenge you mentioned, I'm not sure if it's related to [the question|https://lists.apache.org/thread.html/4f529715984195997441ff80922125019436fe07bc768105d26b0eb6@%3Cdev.flink.apache.org%3E]
I asked before. To keep a consistent time system for the whole stream, we are really faced
with the "time interruption" problem, e.g., how to define and handle the timestamp of an aggregated
or joined result, especially when further processing is needed. Do I get the point?



> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time streams to
the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}}
only can use rowtime that is a system attribute, the time condition only support bounded time
range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1'
HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include
both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should
also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean
in insert a row into a sorted order shift all other computations. This would be too expensive
to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join
with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message