flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
Date Mon, 14 Aug 2017 21:01:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126380#comment-16126380
] 

Fabian Hueske edited comment on FLINK-6233 at 8/14/17 9:00 PM:
---------------------------------------------------------------

Hi [~xccui]:

1. Yes, if both implementations can share significant parts of their code please do so.
2. No, all clean up timers should be on processing time. This is important for consistency
with the other operators.
3. The current design is optimized for the RocksDBStateBackend which returns keys in order.
For RocksDB (and hence for most serious use cases) the current implementation is optimal.
IMO, we should not change that.
4. We will add an allowed latency parameter later in the QueryConfig. You can prepare the
operator to handle this case if it does not add substantial code complexity.

I think separate timer services are an optimization that could be added later. I haven't thought
in detail about this, but if I'm not mistaken separate timers would only be beneficial in
certain combinations of time range join predicates and delayed streams and would not improve
the performance / reduce the state size of a join in all cases. [~xccui] what are your thoughts
on this?



was (Author: fhueske):
Hi [~xccui]:

#1 Yes, if both implementations can share significant parts of their code please do so.
#1 No, all clean up timers should be on processing time. This is important for consistency
with the other operators.
#1 The current design is optimized for the RocksDBStateBackend which returns keys in order.
For RocksDB (and hence for most serious use cases) the current implementation is optimal.
IMO, we should not change that.
#1 We will add an allowed latency parameter later in the QueryConfig. You can prepare the
operator to handle this case if it does not add substantial code complexity.

I think separate timer services are an optimization that could be added later. I haven't thought
in detail about this, but if I'm not mistaken separate timers would only be beneficial in
certain combinations of time range join predicates and delayed streams and would not improve
the performance / reduce the state size of a join in all cases. [~xccui] what are your thoughts
on this?


> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time streams to
the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}}
only can use rowtime that is a system attribute, the time condition only support bounded time
range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1'
HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include
both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should
also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean
in insert a row into a sorted order shift all other computations. This would be too expensive
to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join
with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message