flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6077) Support In/Exists/Except/Any /Some/All for Stream SQL
Date Thu, 30 Mar 2017 12:13:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948952#comment-15948952
] 

Fabian Hueske commented on FLINK-6077:
--------------------------------------

Hi,

I think evaluating an IN or EXCEPT predicate over a stream might be tricky.
This does only work if we the inner and outer queries are correlated on the time attribute.

You can specify a query like this for instance

{code}
SELECT a 
FROM s1
WHERE b IN (SELECT x FROM s2 WHERE s2.proctime BETWEEN s1.proctime - INTERVAL '1' HOUR AND
s1.proctime)
{code}

This would filter out all rows of s1 where no corresponding value of b was observed in the
last hour in s2.
Without having the time correlation, we cannot compute the query because the result of the
subquery might change at a later point in time and we would need to reprocess the whole stream
s1.

> Support In/Exists/Except/Any /Some/All for Stream SQL
> -----------------------------------------------------
>
>                 Key: FLINK-6077
>                 URL: https://issues.apache.org/jira/browse/FLINK-6077
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>         Attachments: in.png
>
>
> Time target: Proc Time
> SQL targeted query examples:
> ----------------------------
> With inner query
> Q1. ```SELECT client FROM stream1 WHERE id 
> IN 
> ((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500 ))```
> Comment: A concrete example for this query can be to consider selecting the customers
where their country is the list of countries of suppliers (`Select customer FROM customers
WHERE Country IN (Select Country FROM suppliers)` )
> Comment: This implementation depends on the implementation of the inner
> query. The structure can be the same as for inner query support with the
> difference that the LogicalJoin between main query and inner query is
> conditional.
> Comment: The inner query needs a bound as otherwise it cannot be decided
> when to trigger.
> Comment: If the value is not triggered by the grouping expression then
> the inner query must based on when that expression changes value.
> Comments: boundaries should be supported over all options: group by
> clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to
> hour),)
> With collection
> Q2. ```SELECT * FROM stream1 WHERE b 
> IN 
> (5000, 7000, 8000, 9000)```
> Comment: This can be checked if it is supported by the DataStreamCalc
> implementation. If not it can be transformed as a sub-JIRA task to
> extend the DataStreamCalc functionality to implement this conditional
> behavior.
> Comment: A similar functionality can be provided if the collection is a
> table rather than a set of values.
> With table
> ```SELECT client FROM stream1 WHERE id 
> IN 
> ((SELECT id FROM table1 where stream1.id = table1.id))```
> Comment: This can be a sub-JIRA issue, perhaps  within the context of dynamic tables,
to support the join with tables and filtering operations based on contents from an external
table
> General comments: **Except** is similar in behavior with IN or EXISTS as
> it filters out outputs of the main stream based on data from a secondary
> stream. The implementation will follow exactly the same logic as for
> IN/Exists by filtering the outputs in the join function between the main
> stream and the secondary stream. Additionally, we apply the same
> restrictions for the secondary/inner queries.
> ```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID = ORDERS.CUSTOMER\_ID

> EXCEPT
> SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID = ORDERS.CUSTOMER\_ID
GROUP BY FLOOR(procTime TO HOUR);```
> Description:
> ------------
> The IN and EXISTS operators are conditional clauses in WHERE clause to
> check for values in certain collections. The collections based on which
> the restriction of the values is done can either be static (values,
> tables, or parts of a stream). This JIRA issue is concerned with the
> latter case of checking for values over a stream. In order for this
> operation to work, the stream needs to be bounded such that the result
> can trigger and the collection can be formed. This points out to using
> some boundaries or groupings over the sub-query that forms the
> collection over which IN is applied. This should be supported via 3
> options as shown below. Each of these options can be a sub-JIRA issue.
> 1)  Group By clauses that are applied over some monotonic order of the
>     stream based on which ranges are defined.
> `   [...] GROUP BY prodId`
> 3)  Window clauses to define rolling partitions of the data of the
>     stream, which evolve in time.
> `    [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3)   PRECEDING);`
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> ```SELECT * FROM stream1 WHERE id IN ((SELECT id2  FROM stream2 GROUP BY FLOOR(PROCTIME
TO HOUR) WHERE b>10   ))```
> Note: The inner query triggers only once an hour. For the next hour the result of the
previous hour from the inner query will be the one used to filter the results from the main
query as they come. This is consistent also with how the inner queries are translated (see
inner queries)
> ||IngestionTime(Event)||Stream1||Stream 2||Output||
> |10:00:01|	Id1,10|		|nil|
> |10:02:00|		|Id2,2| |	
> |11:25:00|		|Id3,15| | 	
> |12:3:00|	Id2,15|		|nil|
> |12:05:00|	Id3,11|		|Id3,11|
> |12:06:00|		|Id2,30| |	
> |12:07:00|		|Id3,2|  |	
> |12:09:00|	Id2.17|		|nil|
> |12:10:00|	Id3,20|		|Id3,20|
> |...|
> Implementation option
> ---------------------
> Considering that the query only makes sense in the context of 1) window
> boundaries and 2) over sub-queries that extract collections of data, the
> main design of this is based on inner query implementation with the
> following modifications. (As a recap the Inner query is implemented with
> a special Join \[left type with always true condition\] between the main
> stream and the output of the inner query which is passed through a
> single value selection aggregation):
> 1)  The condition of outputting a result by the LogicalJoin is not
>     always true as before. Instead the condition is done within the
>     window function by checking that the input from main stream is
>     within the collection from the inner query.
> 2)	The check is done specifically based on the type of function used (IN, ANY, SOMEā€¦.).
The logic of each such function would need to have a direct implementation.
> 3)  The filter on the inner query to keep a single value is removed and
>     instead a collection is passed for evaluation in the join.
> 4)  The boundaries of the SQL query are to be used as the boundaries to
>     define the join window in which the verification is done.
> 5)  Type of the join behavior is of the INNER JOIN from condition point
>     of view (value is emitted only if exists on the other side).
> !in.png!
> General logic of Join
> ---------------------
> leftDataStream.join(rightDataStream).where(new
> ConstantConditionSelector())
> .equalTo(new ConstantConditionSelector())
> .window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create())
> > //.trigger(new DefaultTrigger())
> >
> > //.evictor(new DefaultEvictor())
> .apply(FlatJoinFunctionWithInSelection());



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message