flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
Date Wed, 15 Feb 2017 17:35:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868220#comment-15868220
] 

Fabian Hueske commented on FLINK-5655:
--------------------------------------

Hi [~wheat9], 

First of all, the term sliding window is a bit overloaded. What we call Sliding (Group) Window
in the DataStream API is not the same as a Sliding (Row) Window (hence your examples (1) and
(2) are not the semantically equivalent!) I think the sliding row window semantics are more
common, but now we have the term in Flink coined differently and I don't think there is consensus
to change that. For example this document from the Calcite community calls what Flink calls
"Sliding Windows" "Hopping Windows": http://calcite.apache.org/docs/stream.html

Sorry for the confusion. 

It is possible to define sliding group windows (as described in FLIP-11) in SQL, however,
it is a bit cumbersome.
For instance a sliding window of size 5 minutes that slides every minute could be defined
as 

{code}
SELECT SUM(b) OVER (PARTITION BY a ORDER BY rowtime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
FROM (
  SELECT a, SUM(b) AS b, MAX(rowtime) AS rowtime
  FROM tab
  GROUP BY a, FLOOR(rowtime TO MINUTE)
)
{code}

This query basically first computes partial aggregates using a tumbling window and then the
final aggregates using a row window based on row counts.
However, there are a few issues with that. 
- we do not want to support event-time OVER ROW windows because they might cause very expensive
updates for late data.
- this is very hard to translate to Flink's built-in windows (or the Table API windows) because
the logic is distributed across several operators.

Hope this helps, Fabian


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ----------------------------------------------------------------
>
>                 Key: FLINK-5655
>                 URL: https://issues.apache.org/jira/browse/FLINK-5655
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Shaoxuan Wang
>
> The goal of this issue is to add support for OVER RANGE aggregations on event time streams
to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING
AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING
AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a parameterless
scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some of the restrictions
are trivial to address, we can add the functionality in this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message