Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13B8A200C02 for ; Fri, 20 Jan 2017 16:40:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 12480160B48; Fri, 20 Jan 2017 15:40:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 16A7D160B39 for ; Fri, 20 Jan 2017 16:40:55 +0100 (CET) Received: (qmail 96311 invoked by uid 500); 20 Jan 2017 15:40:55 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 96302 invoked by uid 99); 20 Jan 2017 15:40:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 15:40:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16309DFA6F; Fri, 20 Jan 2017 15:40:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5386] [table] Refactor window clause. Date: Fri, 20 Jan 2017 15:40:55 +0000 (UTC) archived-at: Fri, 20 Jan 2017 15:40:58 -0000 Repository: flink Updated Branches: refs/heads/release-1.2 0a24611a8 -> be09143cd [FLINK-5386] [table] Refactor window clause. - move window() before groupBy() - make window alias mandatory - groupBy() must include window alias Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be09143c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be09143c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be09143c Branch: refs/heads/release-1.2 Commit: be09143cd323d478811447b10807ae7f7d6a4b7b Parents: 0a24611 Author: Jincheng Sun Authored: Thu Dec 15 10:31:33 2016 +0800 Committer: Fabian Hueske Committed: Fri Jan 20 16:40:21 2017 +0100 ---------------------------------------------------------------------- docs/dev/table_api.md | 71 ++--- .../org/apache/flink/table/api/table.scala | 100 ++++--- .../org/apache/flink/table/api/windows.scala | 175 ++++-------- .../flink/table/plan/logical/groupWindows.scala | 69 +++-- .../scala/batch/table/FieldProjectionTest.scala | 14 +- .../scala/stream/table/AggregationsITCase.scala | 15 +- .../scala/stream/table/GroupWindowTest.scala | 277 ++++++++++++++----- 7 files changed, 429 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 0d7331b..80b61f9 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1006,69 +1006,74 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH ### Windows -The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals. +The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. -Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table. +**Note:** Windows are currently only supported for streaming tables. Support for batch tables will be added in the next release. + +Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. +The following example shows how to define a window aggregation on a table.
{% highlight java %} Table table = input - .window(GroupWindow w) // define window - .select("b.sum") // aggregate + .window([Window w].as("w")) // define window with alias w + .groupBy("w") // group the table by window w + .select("b.sum") // aggregate {% endhighlight %}
{% highlight scala %} val table = input - .window(w: GroupWindow) // define window - .select('b.sum) // aggregate + .window([w: Window] as 'w) // define window with alias w + .groupBy('w) // group the table by window w + .select('b.sum) // aggregate {% endhighlight %}
-In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table. +In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. +The following example shows how to define a window aggregation with additional grouping attributes.
{% highlight java %} Table table = input - .groupBy("a") - .window(GroupWindow w) // define window - .select("a, b.sum") // aggregate + .window([Window w].as("w")) // define window with alias w + .groupBy("w, a") // group the table by attribute a and window w + .select("a, b.sum") // aggregate {% endhighlight %}
{% highlight scala %} val table = input - .groupBy('a) - .window(w: GroupWindow) // define window - .select('a, 'b.sum) // aggregate + .window([w: Window] as 'w) // define window with alias w + .groupBy('w, 'a) // group the table by attribute a and window w + .select('a, 'b.sum) // aggregate {% endhighlight %}
-The `GroupWindow` parameter defines how rows are mapped to windows. `GroupWindow` is not an interface that users can implement. Instead, the Table API provides a set of predefined `GroupWindow` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. -By assigning the group-window an alias using `as`, properties such as the start and end timestamp of a time window can be accessed in the `select` statement. +The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below. Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.
{% highlight java %} Table table = input - .groupBy("a") - .window(XXX.as("myWin")) // define window alias - .select("a, myWin.start, myWin.end, b.count") // aggregate + .window([Window w].as("w")) // define window with alias w + .groupBy("w, a") // group the table by attribute a and window w + .select("a, w.start, w.end, b.count") // aggregate and add window start and end timestamps {% endhighlight %}
{% highlight scala %} val table = input - .groupBy('a) - .window(XXX as 'myWin) // define window alias - .select('a, 'myWin.start, 'myWin.end, 'b.count) // aggregate + .window([w: Window] as 'w) // define window with alias w + .groupBy('w, 'a) // group the table by attribute a and window w + .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps {% endhighlight %}
@@ -1096,13 +1101,13 @@ Tumbling windows are defined by using the `Tumble` class as follows: on - Required for streaming event-time windows and windows on batch tables. - Defines the time mode for streaming tables (rowtime is a logical system attribute); for batch tables, the time attribute on which records are grouped. + Required for streaming event-time windows. + Defines the time mode for streaming tables (rowtime is a logical system attribute). as - Optional. - Assigns an alias to the window that can be used in the following select() clause to access window properties such as window start or end time. + Required. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. @@ -1163,13 +1168,13 @@ Sliding windows are defined by using the `Slide` class as follows: on - Required for event-time windows and windows on batch tables. - Defines the time mode for streaming tables (rowtime is a logical system attribute); for batch tables, the time attribute on which records are grouped + Required for event-time windows. + Defines the time mode for streaming tables (rowtime is a logical system attribute). as - Optional. - Assigns an alias to the window that can be used in the following select() clause to access window properties such as window start or end time. + Required. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. @@ -1225,13 +1230,13 @@ A session window is defined by using the `Session` class as follows: on - Required for event-time windows and windows on batch tables. - Defines the time mode for streaming tables (rowtime is a logical system attribute); for batch tables, the time attribute on which records are grouped + Required for event-time windows. + Defines the time mode for streaming tables (rowtime is a logical system attribute). as - Optional. - Assigns an alias to the window that can be used in the following select() clause to access window properties such as window start or end time. + Required. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 6322026..4b197e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -775,23 +775,25 @@ class Table( /** * Groups the records of a table by assigning them to windows defined by a time or row interval. * - * For streaming tables of infinite size, grouping into windows is required to define finite - * groups on which group-based aggregates can be computed. + * Windows are currently only supported for streaming tables. + * They are required to define finite groups on which group-based aggregates can be computed. * - * For batch tables of finite size, windowing essentially provides shortcuts for time-based - * groupBy. + * __Note__: Computing windowed aggregates on a streaming table is only a parallel operation + * if additional grouping attributes are added to the `groupBy(...)` clause. + * If the `groupBy(...)` only references a window alias, the streamed table will be processed + * by a single task, i.e., with parallelism 1. * - * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data - * will be processed by a single operator. - * - * @param groupWindow group-window that specifies how elements are grouped. + * @param window window that specifies how elements are grouped. * @return A windowed table. */ - def window(groupWindow: GroupWindow): GroupWindowedTable = { + def window(window: Window): WindowedTable = { if (tableEnv.isInstanceOf[BatchTableEnvironment]) { throw new ValidationException(s"Windows on batch tables are currently not supported.") } - new GroupWindowedTable(this, Seq(), groupWindow) + if (window.alias.isEmpty) { + throw new ValidationException("An alias must be specified for the window.") + } + new WindowedTable(this, window) } } @@ -844,60 +846,96 @@ class GroupedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } +} + +class WindowedTable( + private[flink] val table: Table, + private[flink] val window: Window) { /** - * Groups the records of a table by assigning them to windows defined by a time or row interval. + * Groups the elements by a mandatory window and one or more optional grouping attributes. + * The window is specified by referring to its alias. * - * For streaming tables of infinite size, grouping into windows is required to define finite - * groups on which group-based aggregates can be computed. + * If no additional grouping attribute is specified and if the input is a streaming table, + * the aggregation will be performed by a single task, i.e., with parallelism 1. * - * For batch tables of finite size, windowing essentially provides shortcuts for time-based - * groupBy. + * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar + * to SQL SELECT-GROUP-BY query. * - * @param groupWindow group-window that specifies how elements are grouped. - * @return A windowed table. + * Example: + * + * {{{ + * tab.window([window] as 'w)).groupBy('w, 'key).select('key, 'value.avg) + * }}} */ - def window(groupWindow: GroupWindow): GroupWindowedTable = { - if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) { - throw new ValidationException(s"Windows on batch tables are currently not supported.") + def groupBy(fields: Expression*): WindowGroupedTable = { + val fieldsWithoutWindow = fields.filterNot(window.alias.get.equals(_)) + if (fields.size != fieldsWithoutWindow.size + 1) { + throw new ValidationException("GroupBy must contain exactly one window alias.") } - new GroupWindowedTable(table, groupKey, groupWindow) + + new WindowGroupedTable(table, fieldsWithoutWindow, window) } + + /** + * Groups the elements by a mandatory window and one or more optional grouping attributes. + * The window is specified by referring to its alias. + * + * If no additional grouping attribute is specified and if the input is a streaming table, + * the aggregation will be performed by a single task, i.e., with parallelism 1. + * + * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar + * to SQL SELECT-GROUP-BY query. + * + * Example: + * + * {{{ + * tab.window([window].as("w")).groupBy("w, key").select("key, value.avg") + * }}} + */ + def groupBy(fields: String): WindowGroupedTable = { + val fieldsExpr = ExpressionParser.parseExpressionList(fields) + groupBy(fieldsExpr: _*) + } + } -class GroupWindowedTable( +class WindowGroupedTable( private[flink] val table: Table, - private[flink] val groupKey: Seq[Expression], - private[flink] val window: GroupWindow) { + private[flink] val groupKeys: Seq[Expression], + private[flink] val window: Window) { /** - * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement. + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. * The field expressions can contain complex expressions and aggregations. * * Example: * * {{{ - * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average) + * windowGroupedTable.select('key, 'window.start, 'value.avg as 'valavg) * }}} */ def select(fields: Expression*): Table = { + // get group keys by removing window alias + val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv) + val projectsOnAgg = replaceAggregationsAndProperties( fields, table.tableEnv, aggNames, propNames) val projectFields = (table.tableEnv, window) match { // event time can be arbitrary field in batch environment case (_: BatchTableEnvironment, w: EventTimeWindow) => - extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField)) + extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField)) case (_, _) => - extractFieldReferences(fields ++ groupKey) + extractFieldReferences(fields ++ groupKeys) } new Table(table.tableEnv, Project( projectsOnAgg, WindowAggregate( - groupKey, + groupKeys, window.toLogicalWindow, propNames.map(a => Alias(a._1, a._2)).toSeq, aggNames.map(a => Alias(a._1, a._2)).toSeq, @@ -907,13 +945,13 @@ class GroupWindowedTable( } /** - * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement. + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. * The field expressions can contain complex expressions and aggregations. * * Example: * * {{{ - * groupWindowTable.select("key, window.start, value.avg + ' The average' as average") + * windowGroupedTable.select("key, window.start, value.avg as valavg") * }}} */ def select(fields: String): Table = { http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala index 7e4498d..5ba6934 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala @@ -22,74 +22,73 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** - * A group-window specification. + * A window specification. * - * Group-windows group rows based on time or row-count intervals and is therefore essentially a - * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates - * on groups of elements. + * Window groups rows based on time or row-count intervals. It is a general way to group the + * elements, which is very helpful for both groupby-aggregations and over-aggregations to + * compute aggregates on groups of elements. * * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping * is required to apply aggregations on streaming tables. * - * For finite batch tables, group-windows provide shortcuts for time-based groupBy. + * For finite batch tables, window provides shortcuts for time-based groupBy. * */ -trait GroupWindow { +abstract class Window { + // The expression of alias for this Window + private[flink] var alias: Option[Expression] = None /** * Converts an API class to a logical window for planning. */ private[flink] def toLogicalWindow: LogicalWindow -} - -/** - * A group-window operating on event-time. - * - * @param timeField defines the time mode for streaming tables. For batch table it defines the - * time attribute on which is grouped. - */ -abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow { - - protected var name: Option[Expression] = None /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * * @param alias alias for this window * @return this window */ - def as(alias: Expression): EventTimeWindow = { - this.name = Some(alias) + def as(alias: Expression): Window = { + this.alias = Some(alias) this } /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * * @param alias alias for this window * @return this window */ - def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) + def as(alias: String): Window = as(ExpressionParser.parseExpression(alias)) } +/** + * A window operating on event-time. + * + * @param timeField defines the time mode for streaming tables. For batch table it defines the + * time attribute on which is grouped. + */ +abstract class EventTimeWindow(val timeField: Expression) extends Window + // ------------------------------------------------------------------------------------------------ -// Tumbling group-windows +// Tumbling windows // ------------------------------------------------------------------------------------------------ /** - * Tumbling group-window. + * Tumbling window. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are * grouped by processing-time. * * @param size the size of the window either as time or row-count interval. */ -class TumblingWindow(size: Expression) extends GroupWindow { +class TumblingWindow(size: Expression) extends Window { /** - * Tumbling group-window. + * Tumbling window. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows * are grouped by processing-time. @@ -98,8 +97,6 @@ class TumblingWindow(size: Expression) extends GroupWindow { */ def this(size: String) = this(ExpressionParser.parseExpression(size)) - private var alias: Option[Expression] = None - /** * Specifies the time attribute on which rows are grouped. * @@ -109,10 +106,10 @@ class TumblingWindow(size: Expression) extends GroupWindow { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a tumbling group-window on event-time + * @return a tumbling window on event-time */ def on(timeField: Expression): TumblingEventTimeWindow = - new TumblingEventTimeWindow(alias, timeField, size) + new TumblingEventTimeWindow(timeField, size) /** * Specifies the time attribute on which rows are grouped. @@ -123,51 +120,29 @@ class TumblingWindow(size: Expression) extends GroupWindow { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a tumbling group-window on event-time + * @return a tumbling window on event-time */ def on(timeField: String): TumblingEventTimeWindow = on(ExpressionParser.parseExpression(timeField)) - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: Expression): TumblingWindow = { - this.alias = Some(alias) - this - } - - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias)) - override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeTumblingGroupWindow(alias, size) } /** - * Tumbling group-window on event-time. + * Tumbling window on event-time. */ class TumblingEventTimeWindow( - alias: Option[Expression], time: Expression, size: Expression) extends EventTimeWindow(time) { override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeTumblingGroupWindow(name.orElse(alias), time, size) + EventTimeTumblingGroupWindow(alias, time, size) } // ------------------------------------------------------------------------------------------------ -// Sliding group windows +// Sliding windows // ------------------------------------------------------------------------------------------------ /** @@ -195,7 +170,7 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a sliding group-window + * @return a sliding window */ def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide) @@ -210,13 +185,13 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a sliding group-window + * @return a sliding window */ def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide)) } /** - * Sliding group-window. + * Sliding window. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are * grouped by processing-time. @@ -226,9 +201,7 @@ class SlideWithSize(size: Expression) { class SlidingWindow( size: Expression, slide: Expression) - extends GroupWindow { - - private var alias: Option[Expression] = None + extends Window { /** * Specifies the time attribute on which rows are grouped. @@ -239,10 +212,10 @@ class SlidingWindow( * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a sliding group-window on event-time + * @return a sliding window on event-time */ def on(timeField: Expression): SlidingEventTimeWindow = - new SlidingEventTimeWindow(alias, timeField, size, slide) + new SlidingEventTimeWindow(timeField, size, slide) /** * Specifies the time attribute on which rows are grouped. @@ -253,66 +226,44 @@ class SlidingWindow( * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a sliding group-window on event-time + * @return a sliding window on event-time */ def on(timeField: String): SlidingEventTimeWindow = on(ExpressionParser.parseExpression(timeField)) - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: Expression): SlidingWindow = { - this.alias = Some(alias) - this - } - - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias)) - override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeSlidingGroupWindow(alias, size, slide) } /** - * Sliding group-window on event-time. + * Sliding window on event-time. */ class SlidingEventTimeWindow( - alias: Option[Expression], timeField: Expression, size: Expression, slide: Expression) extends EventTimeWindow(timeField) { override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide) + EventTimeSlidingGroupWindow(alias, timeField, size, slide) } // ------------------------------------------------------------------------------------------------ -// Session group windows +// Session windows // ------------------------------------------------------------------------------------------------ /** - * Session group-window. + * Session window. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are * grouped by processing-time. * * @param gap the time interval of inactivity before a window is closed. */ -class SessionWindow(gap: Expression) extends GroupWindow { +class SessionWindow(gap: Expression) extends Window { /** - * Session group-window. + * Session window. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows * are grouped by processing-time. @@ -321,8 +272,6 @@ class SessionWindow(gap: Expression) extends GroupWindow { */ def this(gap: String) = this(ExpressionParser.parseExpression(gap)) - private var alias: Option[Expression] = None - /** * Specifies the time attribute on which rows are grouped. * @@ -332,10 +281,10 @@ class SessionWindow(gap: Expression) extends GroupWindow { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a session group-window on event-time + * @return a session window on event-time */ def on(timeField: Expression): SessionEventTimeWindow = - new SessionEventTimeWindow(alias, timeField, gap) + new SessionEventTimeWindow(timeField, gap) /** * Specifies the time attribute on which rows are grouped. @@ -346,45 +295,23 @@ class SessionWindow(gap: Expression) extends GroupWindow { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a session group-window on event-time + * @return a session window on event-time */ def on(timeField: String): SessionEventTimeWindow = on(ExpressionParser.parseExpression(timeField)) - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: Expression): SessionWindow = { - this.alias = Some(alias) - this - } - - /** - * Assigns an alias for this window that the following `select()` clause can refer to in order - * to access window properties such as window start or end time. - * - * @param alias alias for this window - * @return this window - */ - def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias)) - override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeSessionGroupWindow(alias, gap) } /** - * Session group-window on event-time. + * Session window on event-time. */ class SessionEventTimeWindow( - alias: Option[Expression], timeField: Expression, gap: Expression) extends EventTimeWindow(timeField) { override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap) + EventTimeSessionGroupWindow(alias, timeField, gap) } http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala index b12e654..13d40b4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala @@ -25,9 +25,9 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeIn import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} abstract class EventTimeGroupWindow( - name: Option[Expression], + alias: Option[Expression], time: Expression) - extends LogicalWindow(name) { + extends LogicalWindow(alias) { override def validate(tableEnv: TableEnvironment): ValidationResult = { val valid = super.validate(tableEnv) @@ -55,7 +55,22 @@ abstract class EventTimeGroupWindow( } } -abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name) +abstract class ProcessingTimeGroupWindow(alias: Option[Expression]) extends LogicalWindow(alias) { + + override def validate(tableEnv: TableEnvironment): ValidationResult = { + val valid = super.validate(tableEnv) + if (valid.isFailure) { + return valid + } + + tableEnv match { + case b: BatchTableEnvironment => ValidationFailure( + "Window on batch must declare a time attribute over which the query is evaluated.") + case _ => + ValidationSuccess + } + } +} // ------------------------------------------------------------------------------------------------ // Tumbling group windows @@ -74,32 +89,32 @@ object TumblingGroupWindow { } case class ProcessingTimeTumblingGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], size: Expression) - extends ProcessingTimeGroupWindow(name) { + extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeTumblingGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(size)) override def validate(tableEnv: TableEnvironment): ValidationResult = super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) - override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)" + override def toString: String = s"ProcessingTimeTumblingGroupWindow($alias, $size)" } case class EventTimeTumblingGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], timeField: Expression, size: Expression) extends EventTimeGroupWindow( - name, + alias, timeField) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeTumblingGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(timeField), resolve(size)) @@ -114,7 +129,7 @@ case class EventTimeTumblingGroupWindow( ValidationSuccess }) - override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)" + override def toString: String = s"EventTimeTumblingGroupWindow($alias, $timeField, $size)" } // ------------------------------------------------------------------------------------------------ @@ -161,33 +176,33 @@ object SlidingGroupWindow { } case class ProcessingTimeSlidingGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], size: Expression, slide: Expression) - extends ProcessingTimeGroupWindow(name) { + extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeSlidingGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(size), resolve(slide)) override def validate(tableEnv: TableEnvironment): ValidationResult = super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv, size, slide)) - override def toString: String = s"ProcessingTimeSlidingGroupWindow($name, $size, $slide)" + override def toString: String = s"ProcessingTimeSlidingGroupWindow($alias, $size, $slide)" } case class EventTimeSlidingGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], timeField: Expression, size: Expression, slide: Expression) - extends EventTimeGroupWindow(name, timeField) { + extends EventTimeGroupWindow(alias, timeField) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeSlidingGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(timeField), resolve(size), resolve(slide)) @@ -203,7 +218,7 @@ case class EventTimeSlidingGroupWindow( ValidationSuccess }) - override def toString: String = s"EventTimeSlidingGroupWindow($name, $timeField, $size, $slide)" + override def toString: String = s"EventTimeSlidingGroupWindow($alias, $timeField, $size, $slide)" } // ------------------------------------------------------------------------------------------------ @@ -222,37 +237,37 @@ object SessionGroupWindow { } case class ProcessingTimeSessionGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], gap: Expression) - extends ProcessingTimeGroupWindow(name) { + extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeSessionGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(gap)) override def validate(tableEnv: TableEnvironment): ValidationResult = super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap)) - override def toString: String = s"ProcessingTimeSessionGroupWindow($name, $gap)" + override def toString: String = s"ProcessingTimeSessionGroupWindow($alias, $gap)" } case class EventTimeSessionGroupWindow( - name: Option[Expression], + override val alias: Option[Expression], timeField: Expression, gap: Expression) extends EventTimeGroupWindow( - name, + alias, timeField) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeSessionGroupWindow( - name.map(resolve), + alias.map(resolve), resolve(timeField), resolve(gap)) override def validate(tableEnv: TableEnvironment): ValidationResult = super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap)) - override def toString: String = s"EventTimeSessionGroupWindow($name, $timeField, $gap)" + override def toString: String = s"EventTimeSessionGroupWindow($alias, $timeField, $gap)" } http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index cc691d2..4b7147c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -228,6 +228,7 @@ class FieldProjectionTest extends TableTestBase { val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd) val resultTable = sourceTable .window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w) .select(Upper('c).count, 'a.sum) val expected = @@ -253,8 +254,8 @@ class FieldProjectionTest extends TableTestBase { def testSelectFromStreamingGroupedWindow(): Unit = { val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd) val resultTable = sourceTable - .groupBy('b) .window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'b) .select(Upper('c).count, 'a.sum, 'b) val expected = unaryNode( @@ -287,7 +288,6 @@ class FieldProjectionTest extends TableTestBase { .groupBy('word) .select('word, 'frequency.sum as 'frequency) .filter('frequency === 2) - val expected = unaryNode( "DataSetCalc", @@ -310,8 +310,9 @@ class FieldProjectionTest extends TableTestBase { // time field is selected val resultTable = sourceTable - .window(Tumble over 5.millis on 'a as 'w) - .select('a.sum, 'c.count) + .window(Tumble over 5.millis on 'a as 'w) + .groupBy('w) + .select('a.sum, 'c.count) val expected = "TODO" @@ -324,8 +325,9 @@ class FieldProjectionTest extends TableTestBase { // time field is not selected val resultTable = sourceTable - .window(Tumble over 5.millis on 'a as 'w) - .select('c.count) + .window(Tumble over 5.millis on 'a as 'w) + .groupBy('w) + .select('c.count) val expected = "TODO" http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala index f41cae1..a243db7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala @@ -58,8 +58,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Slide over 2.rows every 1.rows) + .window(Slide over 2.rows every 1.rows as 'w) + .groupBy('w, 'string) .select('string, 'int.count, 'int.avg) val results = windowedTable.toDataStream[Row] @@ -83,8 +83,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Session withGap 7.milli on 'rowtime) + .window(Session withGap 7.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val results = windowedTable.toDataStream[Row] @@ -106,7 +106,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .window(Tumble over 2.rows) + .window(Tumble over 2.rows as 'w) + .groupBy('w) .select('int.count) val results = windowedTable.toDataStream[Row] @@ -130,8 +131,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count, 'int.avg, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row] @@ -159,8 +160,8 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end, 'w.start) val results = windowedTable.toDataStream[Row] http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index ee24cf7..337d8e9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -37,8 +37,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) - .window(Session withGap 100.milli as 'string) + .window(Session withGap 100.milli as 'w) + .groupBy('w) } @Test(expected = classOf[ValidationException]) @@ -81,20 +81,45 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table + // only rowtime is a valid time attribute in a stream environment + .window(Tumble over 50.milli on 'string as 'w) + .groupBy('w, 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testGroupByWithoutWindowAlias(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .window(Tumble over 5.milli on 'long as 'w) .groupBy('string) - // only rowtime is a valid time attribute in a stream environment - .window(Tumble over 50.milli on 'string) .select('string, 'int.count) } @Test(expected = classOf[ValidationException]) + def testInvalidRowTimeRef(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .window(Tumble over 5.milli on 'long as 'w) + .groupBy('w, 'string) + .select('string, 'int.count) + .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int does not exist in input. + .groupBy('w2) + .select('string) + } + + @Test(expected = classOf[ValidationException]) def testInvalidTumblingSize(): Unit = { val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) - .window(Tumble over "WRONG") // string is not a valid interval + .window(Tumble over "WRONG" as 'w) // string is not a valid interval + .groupBy('w, 'string) .select('string, 'int.count) } @@ -104,8 +129,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) - .window(Slide over "WRONG" every "WRONG") // string is not a valid interval + .window(Slide over "WRONG" every "WRONG" as 'w) // string is not a valid interval + .groupBy('w, 'string) .select('string, 'int.count) } @@ -115,8 +140,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) - .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed + .window(Slide over 12.rows every 1.minute as 'w) // row and time intervals may not be mixed + .groupBy('w, 'string) .select('string, 'int.count) } @@ -126,8 +151,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) - .window(Session withGap 10.rows) // row interval is not valid for session windows + .window(Session withGap 10.rows as 'w) // row interval is not valid for session windows + .groupBy('w, 'string) .select('string, 'int.count) } @@ -137,8 +162,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol + .groupBy('string) .select('string, 'int.count) } @@ -148,9 +173,53 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) table - .groupBy('string) .window(Session withGap 100.milli as 'string) // field name "string" is already present + .groupBy('string) + .select('string, 'int.count) + } + + @Test + def testMultiWindow(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Tumble over 50.milli as 'w1) + .groupBy('w1, 'string) .select('string, 'int.count) + .window(Slide over 20.milli every 10.milli as 'w2) + .groupBy('w2) + .select('string.count) + + val expected = unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "string", "int") + ), + term("groupBy", "string"), + term( + "window", + ProcessingTimeTumblingGroupWindow( + Some(WindowReference("w1")), + 50.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ), + term("select", "string") + ), + term( + "window", + ProcessingTimeSlidingGroupWindow( + Some(WindowReference("w2")), + 20.milli, 10.milli)), + term("select", "COUNT(string) AS TMP_2") + ) + util.verifyTable(windowedTable, expected) } @Test @@ -159,8 +228,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Tumble over 50.milli) + .window(Tumble over 50.milli as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -171,7 +240,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)), + term( + "window", + ProcessingTimeTumblingGroupWindow( + Some(WindowReference("w")), + 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -184,8 +257,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Tumble over 2.rows) + .window(Tumble over 2.rows as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -196,7 +269,10 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)), + term( + "window", + ProcessingTimeTumblingGroupWindow( + Some(WindowReference("w")), 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -209,8 +285,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Tumble over 5.milli on 'rowtime) + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -221,7 +297,12 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)), + term( + "window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), + 5.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -235,15 +316,19 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Tumble over 2.rows on 'rowtime) + .window(Tumble over 2.rows on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( "DataStreamAggregate", streamTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)), + term( + "window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -256,8 +341,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Slide over 50.milli every 50.milli) + .window(Slide over 50.milli every 50.milli as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -268,7 +353,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)), + term( + "window", + ProcessingTimeSlidingGroupWindow( + Some(WindowReference("w")), + 50.milli, 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -281,8 +370,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Slide over 2.rows every 1.rows) + .window(Slide over 2.rows every 1.rows as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -293,7 +382,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)), + term( + "window", + ProcessingTimeSlidingGroupWindow( + Some(WindowReference("w")), + 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -306,8 +399,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Slide over 8.milli every 10.milli on 'rowtime) + .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -318,7 +411,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)), + term( + "window", + EventTimeSlidingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 8.milli, 10.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -332,15 +429,19 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Slide over 2.rows every 1.rows on 'rowtime) + .window(Slide over 2.rows every 1.rows on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( "DataStreamAggregate", streamTableNode(0), term("groupBy", "string"), - term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)), + term( + "window", + EventTimeSlidingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -353,8 +454,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Session withGap 7.milli on 'rowtime) + .window(Session withGap 7.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -365,7 +466,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)), + term( + "window", + EventTimeSessionGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -378,8 +483,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) - .window(Tumble over 50.milli) + .window(Tumble over 50.milli as 'w) + .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( @@ -390,7 +495,11 @@ class GroupWindowTest extends TableTestBase { term("select", "string", "int") ), term("groupBy", "string"), - term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)), + term( + "window", + ProcessingTimeTumblingGroupWindow( + Some(WindowReference("w")), + 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -403,7 +512,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Tumble over 2.rows) + .window(Tumble over 2.rows as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -413,7 +523,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)), + term( + "window", + ProcessingTimeTumblingGroupWindow( + Some(WindowReference("w")), + 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -426,7 +540,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Tumble over 5.milli on 'rowtime) + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -436,7 +551,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)), + term( + "window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 5.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -450,7 +569,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Tumble over 2.rows on 'rowtime) + .window(Tumble over 2.rows on 'rowtime as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -460,7 +580,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)), + term( + "window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -474,7 +598,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Slide over 50.milli every 50.milli) + .window(Slide over 50.milli every 50.milli as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -484,7 +609,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)), + term( + "window", + ProcessingTimeSlidingGroupWindow( + Some(WindowReference("w")), + 50.milli, 50.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -497,7 +626,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Slide over 2.rows every 1.rows) + .window(Slide over 2.rows every 1.rows as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -507,7 +637,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)), + term( + "window", + ProcessingTimeSlidingGroupWindow( + Some(WindowReference("w")), + 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -520,7 +654,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Slide over 8.milli every 10.milli on 'rowtime) + .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -530,7 +665,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)), + term( + "window", + EventTimeSlidingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 8.milli, 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -544,7 +683,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Slide over 2.rows every 1.rows on 'rowtime) + .window(Slide over 2.rows every 1.rows on 'rowtime as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -554,7 +694,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)), + term( + "window", + EventTimeSlidingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -567,7 +711,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .window(Session withGap 7.milli on 'rowtime) + .window(Session withGap 7.milli on 'rowtime as 'w) + .groupBy('w) .select('int.count) val expected = unaryNode( @@ -577,7 +722,11 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("select", "int") ), - term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)), + term( + "window", + EventTimeSessionGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), 7.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -590,8 +739,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( @@ -623,8 +772,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( @@ -657,8 +806,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Session withGap 3.milli on 'rowtime as 'w) + .groupBy('w, 'string) .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2) val expected = unaryNode( @@ -694,8 +843,8 @@ class GroupWindowTest extends TableTestBase { val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table - .groupBy('string) .window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'string) .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2, 'w.end as 'x3, 'w.end)