flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
Date Thu, 17 Aug 2017 22:03:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131364#comment-16131364
] 

Fabian Hueske commented on FLINK-6233:
--------------------------------------

Hi [~xccui], sorry for the late reply.

You are right that the clean-up of a window needs to be triggered by the watermark of the
other input. Basically, the watermark tells the operator that no more record from that input
are expected that can join with some records of the other input. Those records can be discarded
from the state.
A benefit of handling the watermarks of both inputs separately would be to immediately and
completely join records from the slower input with the state of the faster input and not putting
them into state to wait for other records from the faster input because we would know that
those have all already been received due to the higher watermark.

Regarding your question
{quote}Did you mean that even for the rowtime join, the clean up timers should also use the
ctx.registerProcessingTimeTimer() instead of ctx.registerEventTimeTimer()? I noticed that
there's another issue (FLINK-7388) about the onTimer() method, but not sure if it's relative.{quote}

Yes, also event-time operators should implement the state retention time policy based on processing
time. However, we don't need this for the windowed join operator. Windowed operators can (and
must) automatically clear their complete state as time progresses. The state retention timers
were added for operators that need to keep state forever to ensure correct semantics. By removing
state, we give up correct semantics (in some cases) but ensure that the query does not leak
state and can run for a very long time.

I'm currently on vacation and don't have much time for reviewing and no dev machine with me.
I'll try to take a look at your code in the next days but can't promise.

Best, Fabian

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time streams to
the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}}
only can use rowtime that is a system attribute, the time condition only support bounded time
range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1'
HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include
both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should
also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean
in insert a row into a sorted order shift all other computations. This would be too expensive
to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join
with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message