flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime
Date Thu, 30 Mar 2017 08:48:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948683#comment-15948683
] 

Fabian Hueske commented on FLINK-6073:
--------------------------------------

Hi [~rtudoran], 

I agree, the query I proposed would not be easy to optimize. We would need some well designed
rules to transform the logical plan into an efficient execution plan.
However, the query produces the desired result semantics that you described in the JIRA. IMO,
the semantics of the queries are fixed and we cannot change those.
What we can do though, is to focus on different join semantics first which are easier to implement.

The query you proposed in 1) 

{code}
SELECT T1.A1
   (SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) FROM T2)
 FROM T1 
{code}

has different semantics than the original result and would fail on a stream because the inner
query {{(SELECT Max(T2.B1) OVER (ORDER BY T2.B4 RANGE INTERVAL '1' HOUR PRECEDING) FROM T2)}}
would return more than a single row (in fact one row for each stream record). If the goal
is that we have different exchange rates in the result, the intermediate result with which
we join (i.e., the result of the subquery) needs to have at least one row for each exchange
rate. Hence, we cannot implement it as an inner query in a {{SELECT}} clause but need a regular
join with the other table.

The second query that you proposed suffers from the same problem. The subquery returns more
than a single row.

We could to do something like 

{code}
SELECT amount, maxRate
FROM T1, 
  (SELECT TUMBLE_END(time, INTERVAL '1' HOUR) AS hour, MAX(exchange) AS maxRate FROM T2 GROUP
BY TUMBLE(time, INTERVAL '1' HOUR) rates
WHERE CEIL(T1.time, HOUR) = rates.hour
{code}

This would join each amount with the maxRate that was observed within the same hour (if amount
is at 11:58:00 the max rate from 11:00:00 to 11:59:59).
But this would not be the result you asked for.





> Support for SQL inner queries for proctime
> ------------------------------------------
>
>                 Key: FLINK-6073
>                 URL: https://issues.apache.org/jira/browse/FLINK-6073
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>            Assignee: radu
>            Priority: Critical
>              Labels: features
>         Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to combine in
the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  limit 1) from
s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with limit 1. This
ensures the equivalency with the SingleElementAggregation used when translated the main target
syntax for inner query. We must ensure that the 2 syntaxes are supported and implemented with
the same functionality. 
> There is the option also to select elements in the inner query from a table not just
from a different stream. This should be a sub-JIRA issue implement this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function (left join with
always true condition) between the output of the query on the main stream and the output of
a single output aggregation operation on the inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
> 	LogicalSingleValue[type=aggregation]
> 		…logic of inner query (LogicalProject, LogicalScan…)
> 	…logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special case operation
rather than a proper join to be implemented between stream-to-stream. The implementation behavior
should attach to the main stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder of the
single value that results from the inner query. As this operator is the guarantee that the
inner query will bring to the join no more than one value, there are several options on how
to consider it’s functionality in the streaming context:
> 1. 	Throw an error if the inner query returns more than one result. This would be a typical
behavior in the case of standard SQL over DB. However, it is very unlikely that a stream would
only emit a single value. Therefore, such a behavior would be very limited for streams in
the inner query. However, such a behavior might be more useful and common if the inner query
is over a table. 
> 1. 	We can interpret the usage of this parameter as the guarantee that at one moment
only one value is selected. Therefore the behavior would rather be as a filter to select one
value. This brings the option that the output of this operator evolves in time with the second
stream that drives the inner query. The decision on when to evolve the stream should depend
on what marks the evolution of the stream (processing time, watermarks/event time, ingestion
time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For this implementation
the operator would work based on option 2. Hence at every moment the state of the operator
that holds one value can evolve with the last elements. In this way the logic of the inner
query is to select always the last element (fields, or other query related transformations
based on the last value). This behavior is needed in many scenarios: (e.g., the typical problem
of computing the total income, when incomes are in multiple currencies and the total needs
to be computed in one currency by using always the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query example –
Q3  (using inner query as the input source for FROM ). In such scenarios, the selection in
the main query would need to be done based on latest elements. Therefore with such a behavior
the 2 types of queries (Q1 and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the inner query
applied on 2 streams that operate on processing time.
> SELECT amount, (SELECT exchange FROM inputstream1) AS field1 FROM inputstream2
>  ||Time||Stream1||Stream2||Output||
> |T1|	    |	1.2|	         | 
> |T2|User1,10|	   |	 (10,1.2)|
> |T3|User2,11| 	   |	 (11,1.2)|
> |T4|		|   1.3|             |     
> |T5|User3,9 |	   |      (9,1.3)|
> |...|
> Note 1. For streams that would operate on event time, at moment T3 we would need to retract
the previous outputs ((10, 1.2), (11,1.2) ) and reemit them as ((10,1.3), (11,1.3) ). 
> Note 2. Rather than failing when a new value comes in the inner query we just update
the state that holds the single value. If option 1 for the behavior of LogicalSingleValue
is chosen, than an error should be triggered at moment T3.
> **Implementation option**
> Considering the notes and the option for the behavior the operator would be implemented
by using the join function of flink  with a custom always true join condition and an inner
selection for the output based on the incoming direction (to mimic the left join). The single
value selection can be implemented over a statefull flat map. In case the join is executed
in parallel by multiple operators, than we either use a parallelism of 1 for the statefull
flatmap (option 1) or we broadcast the outputs of the flatmap to all join instances to ensure
consistency of the results (option 2). Considering that the flatMap functionality of selecting
one value is light, option 1 is better.  The design schema is shown below.
> !innerquery.png!
> **General logic of Join**
> ```
> leftDataStream.join(rightDataStream)
>                  .where(new ConstantConditionSelector())
>                  .equalTo(new ConstantConditionSelector())
>                 .window(window.create())
>                 .trigger(new LeftFireTrigger())
>                 .evictor(new Evictor())
>                .apply(JoinFunction());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message