flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
Date Wed, 19 Jul 2017 13:09:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16093041#comment-16093041

Fabian Hueske commented on FLINK-6233:

Hi [~xccui],

I agree that it would be nice to support arbitrary joins, however I am not sure that this
should be our current focus. I think theta joins are much less common than equality join and
might also be prohibitively expensive to compute. If I read the code of your demo correctly,
you are implementing a broadcast-forward strategy to compute theta joins and joining the results
in a non-keyed ProcessFunction.
The problem with non-keyed operators is that they cannot use keyed state but only operator
state. However, operator state is not integrated with Flink's state backends and is hence
kept on the JVM heap. So it cannot use RocksDB to spill to disk. A non-keyed operator that
holds too much state on the heap will fail. In the context of a broadcast join, this is especially
important because one stream will be replicated to all parallel operator and needs to be stored
as state. I would agree to have a somewhat inefficient join would be good as a fallback but
IMO it also needs to be reliable which cannot be achieved the operator state (at least for

Regarding the timestamp issue, I think you got the point. Let's say we have the following

// Table left: [a, b, ltime]
// Table right: [c, d, rtime]

FROM left, right
WHERE a = c AND ltime BETWEEN rtime - INTERVAL '1' HOUR AND rtime + INTERVAL '1' HOUR

{{left}} and {{right}} are streaming tables and their rows are timestamped with {{ltime}}
and {{rtime}} respectively.
When we do {{SELECT *}} it is not clear which of both timestamps should be assigned to the
result of the join. There can only be one row timestamp, the other timestamp can be materialized
Even if we know which timestamp we want to materialize, we have to hold back the watermarks
such that we do not emit late data. Holding back watermarks is a difficult business though
and can not be done with a ProcessFunction but only in a custom operator.

> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
> The goal of this issue is to add support for inner equi-join on proc time streams to
the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}}
only can use rowtime that is a system attribute, the time condition only support bounded time
range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1'
HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include
both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should
also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean
in insert a row into a sorted order shift all other computations. This would be too expensive
to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join
with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 

This message was sent by Atlassian JIRA

View raw message