Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5CBE6200D27 for ; Wed, 25 Oct 2017 22:02:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5B0781609CE; Wed, 25 Oct 2017 20:02:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2F1C2160BDA for ; Wed, 25 Oct 2017 22:02:19 +0200 (CEST) Received: (qmail 51136 invoked by uid 500); 25 Oct 2017 20:02:18 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 51127 invoked by uid 99); 25 Oct 2017 20:02:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 20:02:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38F3ADFBD7; Wed, 25 Oct 2017 20:02:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 25 Oct 2017 20:02:16 -0000 Message-Id: <765c6a7aad7a4be69c167f1245b6c91a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [FLINK-6697] [table] Add support for group window ROWTIME to batch SQL & Table API. archived-at: Wed, 25 Oct 2017 20:02:21 -0000 [FLINK-6697] [table] Add support for group window ROWTIME to batch SQL & Table API. - Fixes [FLINK-7542] "AggregateITCase fails in different timezone." This closes #4796. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/babee277 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/babee277 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/babee277 Branch: refs/heads/master Commit: babee277204b6b1edcfe9c7c76348254019b2dd3 Parents: 14e0948 Author: Fabian Hueske Authored: Sun Aug 6 23:55:56 2017 +0200 Committer: Fabian Hueske Committed: Wed Oct 25 22:01:39 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/tableApi.md | 16 +-- .../calcite/sql/fun/SqlGroupFunction.java | 20 +++ .../table/expressions/fieldExpression.scala | 25 +++- .../rules/common/WindowPropertiesRule.scala | 70 ++++++---- .../DataSetLogicalWindowAggregateRule.scala | 7 +- .../DataStreamLogicalWindowAggregateRule.scala | 7 +- .../table/runtime/aggregate/AggregateUtil.scala | 12 +- ...SetSessionWindowAggReduceGroupFunction.scala | 8 +- ...SetSlideWindowAggReduceCombineFunction.scala | 4 + ...taSetSlideWindowAggReduceGroupFunction.scala | 14 +- ...mbleTimeWindowAggReduceCombineFunction.scala | 4 + ...TumbleTimeWindowAggReduceGroupFunction.scala | 10 +- ...rementalAggregateAllTimeWindowFunction.scala | 4 +- ...IncrementalAggregateTimeWindowFunction.scala | 4 +- .../aggregate/TimeWindowPropertyCollector.scala | 18 ++- .../flink/table/validate/FunctionCatalog.scala | 27 +++- .../table/api/batch/sql/GroupWindowTest.scala | 24 ++-- .../validation/GroupWindowValidationTest.scala | 17 +-- .../table/api/batch/table/GroupWindowTest.scala | 134 +++++++++++++++++++ .../runtime/batch/sql/AggregateITCase.scala | 123 +++++++++++++++-- .../runtime/batch/table/GroupWindowITCase.scala | 97 ++++++++------ .../table/runtime/stream/sql/SqlITCase.scala | 4 +- .../stream/table/GroupWindowITCase.scala | 26 ++-- 23 files changed, 511 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f0c4605..2b45b5a 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -344,7 +344,7 @@ Table orders = tableEnv.scan("Orders"); Table result = orders .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window .groupBy("a, w") // group by key and window - .select("a, w.start, w.end, b.sum as d"); // access window properties and aggregate + .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate {% endhighlight %} @@ -427,7 +427,7 @@ val orders: Table = tableEnv.scan("Orders") val result: Table = orders .window(Tumble over 5.minutes on 'rowtime as 'w) // define window .groupBy('a, 'w) // group by key and window - .select('a, w.start, 'w.end, 'b.sum as 'd) // access window properties and aggregate + .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate {% endhighlight %} @@ -1178,7 +1178,7 @@ val table = input -Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively. +Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and uppper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29:59.999` as rowtime timestamp, and `14:30:00.000` as end timestamp.
@@ -1186,7 +1186,7 @@ Window properties such as the start and end timestamp of a time window can be ad Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w - .select("a, w.start, w.end, b.count"); // aggregate and add window start and end timestamps + .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps {% endhighlight %}
@@ -1195,7 +1195,7 @@ Table table = input val table = input .window([w: Window] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w - .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps + .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps {% endhighlight %}
@@ -1227,7 +1227,7 @@ Tumbling windows are defined by using the `Tumble` class as follows: as - Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start, end, or rowtime timestamps in the select() clause. @@ -1289,7 +1289,7 @@ Sliding windows are defined by using the `Slide` class as follows: as - Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start, end, or rowtime timestamps in the select() clause. @@ -1347,7 +1347,7 @@ A session window is defined by using the `Session` class as follows: as - Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start or end time in the select() clause. + Assigns an alias to the window. The alias is used to reference the window in the following groupBy() clause and optionally to select window properties such as window start, end, or rowtime timestamps in the select() clause. http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java index fd5ddf9..0bb26da 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java @@ -22,6 +22,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.validate.SqlMonotonicity; import com.google.common.collect.ImmutableList; @@ -84,6 +85,25 @@ public class SqlGroupFunction extends SqlFunction { this(kind.name(), kind, groupFunction, operandTypeChecker); } + /** Creates a SqlGroupFunction. + * + * @param name Function name + * @param kind Kind + * @param groupFunction Group function, if this is an auxiliary; + * null, if this is a group function + * @param returnTypeInference Inference of the functions return type + * @param operandTypeChecker Operand type checker + */ + public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction groupFunction, + SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker) { + super(name, kind, returnTypeInference, null, operandTypeChecker, + SqlFunctionCategory.SYSTEM); + this.groupFunction = groupFunction; + if (groupFunction != null) { + assert groupFunction.groupFunction == null; + } + } + /** Creates an auxiliary function from this grouped window function. * * @param kind Kind; also determines function name http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala index dab9ce3..bad5889 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{Table, UnresolvedException, ValidationException} +import org.apache.flink.table.api._ import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType} +import org.apache.flink.table.calcite.FlinkTypeFactory._ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -163,9 +163,13 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { override private[flink] def validateInput(): ValidationResult = { child match { - case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) => + case WindowReference(_, Some(tpe)) if isProctimeIndicatorType(tpe) => ValidationFailure("A proctime window cannot provide a rowtime attribute.") case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) => + // rowtime window + ValidationSuccess + case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP => + // batch time window ValidationSuccess case WindowReference(_, _) => ValidationFailure("Reference to a rowtime or proctime window required.") @@ -175,8 +179,19 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { } } - override def resultType: TypeInformation[_] = - TimeIndicatorTypeInfo.ROWTIME_INDICATOR + override def resultType: TypeInformation[_] = { + child match { + case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) => + // rowtime window + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP => + // batch time window + Types.SQL_TIMESTAMP + case _ => + throw TableException("WindowReference of RowtimeAttribute has invalid type. " + + "Please report this bug.") + } + } override def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala index c228528..74f7b1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.tools.RelBuilder -import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.{TableException, Types, ValidationException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical.LogicalWindow @@ -59,23 +59,24 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa agg: LogicalWindowAggregate): RelNode = { val w = agg.getWindow - - val isRowtime = ExpressionUtils.isRowtimeAttribute(w.timeAttribute) - val isProctime = ExpressionUtils.isProctimeAttribute(w.timeAttribute) + val windowType = getWindowType(w) val startEndProperties = Seq( NamedWindowProperty(propertyName(w, "start"), WindowStart(w.aliasAttribute)), NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute))) // allow rowtime/proctime for rowtime windows and proctime for proctime windows - val timeProperties = if (isRowtime) { - Seq( - NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), - NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) - } else if (isProctime) { - Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) - } else { - Seq() + val timeProperties = windowType match { + case 'streamRowtime => + Seq( + NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), + NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + case 'streamProctime => + Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + case 'batchRowtime => + Seq(NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute))) + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } val properties = startEndProperties ++ timeProperties @@ -103,6 +104,18 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa builder.build() } + private def getWindowType(window: LogicalWindow): Symbol = { + if (ExpressionUtils.isRowtimeAttribute(window.timeAttribute)) { + 'streamRowtime + } else if (ExpressionUtils.isProctimeAttribute(window.timeAttribute)) { + 'streamProctime + } else if (window.timeAttribute.resultType == Types.SQL_TIMESTAMP) { + 'batchRowtime + } else { + throw new TableException("Unknown window type encountered. Please report this bug.") + } + } + /** Generates a property name for a window. */ private def propertyName(window: LogicalWindow, name: String): String = { window.aliasAttribute.asInstanceOf[WindowReference].name + name @@ -115,9 +128,7 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa builder: RelBuilder): RexNode = { val rexBuilder = builder.getRexBuilder - - val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute) - val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute) + val windowType = getWindowType(window) node match { case c: RexCall if isWindowStart(c) => @@ -129,22 +140,25 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "end")), false) case c: RexCall if isWindowRowtime(c) => - if (isProctime) { - throw ValidationException("A proctime window cannot provide a rowtime attribute.") - } else if (isRowtime) { - // replace expression by access to window rowtime - builder.field(propertyName(window, "rowtime")) - } else { - throw TableException("Accessing the rowtime attribute of a window is not yet " + - "supported in a batch environment.") + windowType match { + case 'streamRowtime | 'batchRowtime => + // replace expression by access to window rowtime + builder.field(propertyName(window, "rowtime")) + case 'streamProctime => + throw ValidationException("A proctime window cannot provide a rowtime attribute.") + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } case c: RexCall if isWindowProctime(c) => - if (isProctime || isRowtime) { - // replace expression by access to window proctime - builder.field(propertyName(window, "proctime")) - } else { - throw ValidationException("Proctime is not supported in a batch environment.") + windowType match { + case 'streamProctime | 'streamRowtime => + // replace expression by access to window proctime + builder.field(propertyName(window, "proctime")) + case 'batchRowtime => + throw ValidationException("PROCTIME window property is not supported in batch queries.") + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } case c: RexCall => http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala index 129e0d3..346eefa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala @@ -67,23 +67,24 @@ class DataSetLogicalWindowAggregateRule } } + val timeField = getFieldReference(windowExpr.getOperands.get(0)) windowExpr.getOperator match { case BasicOperatorTable.TUMBLE => val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) case BasicOperatorTable.HOP => val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) val w = Slide .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) case BasicOperatorTable.SESSION => val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index eaad885..254f36b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -23,7 +23,6 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, ValidationException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory @@ -89,7 +88,7 @@ class DataStreamLogicalWindowAggregateRule val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) @@ -98,14 +97,14 @@ class DataStreamLogicalWindowAggregateRule .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index ce13cdc..bdfdbf5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -595,7 +595,7 @@ object AggregateUtil { window match { case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => // tumbling time window - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for incremental aggregations new DataSetTumbleTimeWindowAggReduceCombineFunction( @@ -604,6 +604,7 @@ object AggregateUtil { asLong(size), startPos, endPos, + timePos, keysAndAggregatesArity) } else { @@ -613,6 +614,7 @@ object AggregateUtil { asLong(size), startPos, endPos, + timePos, outputType.getFieldCount) } case TumblingGroupWindow(_, _, size) => @@ -622,17 +624,18 @@ object AggregateUtil { asLong(size)) case SessionGroupWindow(_, _, gap) => - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) new DataSetSessionWindowAggReduceGroupFunction( genFinalAggFunction, keysAndAggregatesArity, startPos, endPos, + timePos, asLong(gap), isInputCombined) case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for partial aggregations new DataSetSlideWindowAggReduceCombineFunction( @@ -641,6 +644,7 @@ object AggregateUtil { keysAndAggregatesArity, startPos, endPos, + timePos, asLong(size)) } else { @@ -650,6 +654,7 @@ object AggregateUtil { keysAndAggregatesArity, startPos, endPos, + timePos, asLong(size)) } @@ -659,6 +664,7 @@ object AggregateUtil { keysAndAggregatesArity, None, None, + None, asLong(size)) case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index d99ca31..372fc0d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -44,6 +44,7 @@ import org.apache.flink.util.Collector * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos The relative window-start field position. * @param finalRowWindowEndPos The relative window-end field position. + * @param finalRowWindowRowtimePos The relative window-rowtime field position. * @param gap Session time window gap. */ class DataSetSessionWindowAggReduceGroupFunction( @@ -51,13 +52,14 @@ class DataSetSessionWindowAggReduceGroupFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], gap: Long, isInputCombined: Boolean) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ private val intermediateRowWindowStartPos = keysAndAggregatesArity private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1 @@ -78,10 +80,10 @@ class DataSetSessionWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector( + collector = new DataSetTimeWindowPropertyCollector( finalRowWindowStartPos, finalRowWindowEndPos, - None) + finalRowWindowRowtimePos) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala index 381d443..2da838f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala @@ -36,6 +36,8 @@ import org.apache.flink.types.Row * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the + * output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( @@ -44,12 +46,14 @@ class DataSetSlideWindowAggReduceCombineFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( genFinalAggregations, keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, + finalRowWindowRowtimePos, windowSize) with CombineFunction[Row, Row] { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala index c64a522..474a09b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -36,6 +36,8 @@ import org.apache.flink.util.Collector * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the + * output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceGroupFunction( @@ -43,12 +45,13 @@ class DataSetSlideWindowAggReduceGroupFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], windowSize: Long) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ protected val windowStartPos: Int = keysAndAggregatesArity private var output: Row = _ @@ -68,10 +71,10 @@ class DataSetSlideWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector( + collector = new DataSetTimeWindowPropertyCollector( finalRowWindowStartPos, finalRowWindowEndPos, - None) + finalRowWindowRowtimePos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { @@ -93,7 +96,10 @@ class DataSetSlideWindowAggReduceGroupFunction( function.setAggregationResults(accumulators, output) // adds TimeWindow properties to output then emit output - if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + if (finalRowWindowStartPos.isDefined || + finalRowWindowEndPos.isDefined || + finalRowWindowRowtimePos.isDefined) { + collector.wrappedCollector = out collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long] collector.windowEnd = collector.windowStart + windowSize http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala index 4a459b2..9eeab33 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala @@ -37,6 +37,8 @@ import org.apache.flink.types.Row * output row * @param windowEndPos The relative window-end field position to the last field of * output row + * @param windowRowtimePos The relative window-rowtime field position to the last field of + * output row * @param keysAndAggregatesArity The total arity of keys and aggregates */ class DataSetTumbleTimeWindowAggReduceCombineFunction( @@ -45,12 +47,14 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( windowSize: Long, windowStartPos: Option[Int], windowEndPos: Option[Int], + windowRowtimePos: Option[Int], keysAndAggregatesArity: Int) extends DataSetTumbleTimeWindowAggReduceGroupFunction( genFinalAggregations, windowSize, windowStartPos, windowEndPos, + windowRowtimePos, keysAndAggregatesArity) with CombineFunction[Row, Row] { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 7ae4c17..4e92148 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -35,6 +35,8 @@ import org.apache.flink.util.Collector * @param windowSize Tumbling time window size * @param windowStartPos The relative window-start field position to the last field of output row * @param windowEndPos The relative window-end field position to the last field of output row + * @param windowRowtimePos The relative window-rowtime field position to the last field of + * output row * @param keysAndAggregatesArity The total arity of keys and aggregates */ class DataSetTumbleTimeWindowAggReduceGroupFunction( @@ -42,12 +44,13 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( windowSize: Long, windowStartPos: Option[Int], windowEndPos: Option[Int], + windowRowtimePos: Option[Int], keysAndAggregatesArity: Int) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1) private var output: Row = _ @@ -67,7 +70,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos, None) + collector = new DataSetTimeWindowPropertyCollector( + windowStartPos, + windowEndPos, + windowRowtimePos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala index 3c2e858..818418e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala @@ -42,10 +42,10 @@ class IncrementalAggregateAllTimeWindowFunction( extends IncrementalAggregateAllWindowFunction[TimeWindow]( finalRowArity) { - private var collector: CRowTimeWindowPropertyCollector = _ + private var collector: DataStreamTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector( + collector = new DataStreamTimeWindowPropertyCollector( windowStartOffset, windowEndOffset, windowRowtimeOffset) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index 69e4f7b..a908f49 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -48,10 +48,10 @@ class IncrementalAggregateTimeWindowFunction( numAggregates, finalRowArity) { - private var collector: CRowTimeWindowPropertyCollector = _ + private var collector: DataStreamTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector( + collector = new DataStreamTimeWindowPropertyCollector( windowStartOffset, windowEndOffset, windowRowtimeOffset) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 16e4a0b..e9ecf0f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -40,6 +40,8 @@ abstract class TimeWindowPropertyCollector[T]( def getRow(record: T): Row + def setRowtimeAttribute(pos: Int): Unit + override def collect(record: T): Unit = { output = getRow(record) @@ -57,9 +59,7 @@ abstract class TimeWindowPropertyCollector[T]( } if (windowRowtimeOffset.isDefined) { - output.setField( - lastFieldPos + windowRowtimeOffset.get, - windowEnd - 1) + setRowtimeAttribute(lastFieldPos + windowRowtimeOffset.get) } wrappedCollector.collect(record) @@ -68,20 +68,28 @@ abstract class TimeWindowPropertyCollector[T]( override def close(): Unit = wrappedCollector.close() } -class RowTimeWindowPropertyCollector( +final class DataSetTimeWindowPropertyCollector( startOffset: Option[Int], endOffset: Option[Int], rowtimeOffset: Option[Int]) extends TimeWindowPropertyCollector[Row](startOffset, endOffset, rowtimeOffset) { override def getRow(record: Row): Row = record + + override def setRowtimeAttribute(pos: Int): Unit = { + output.setField(pos, SqlFunctions.internalToTimestamp(windowEnd - 1)) + } } -class CRowTimeWindowPropertyCollector( +final class DataStreamTimeWindowPropertyCollector( startOffset: Option[Int], endOffset: Option[Int], rowtimeOffset: Option[Int]) extends TimeWindowPropertyCollector[CRow](startOffset, endOffset, rowtimeOffset) { override def getRow(record: CRow): Row = record.row + + override def setRowtimeAttribute(pos: Int): Unit = { + output.setField(pos, windowEnd - 1) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 6c6be0b..535fd6e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.validate -import org.apache.calcite.sql.`type`.OperandTypes +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeTransforms} import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable} import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, SqlOperatorTable} @@ -442,7 +442,13 @@ object BasicOperatorTable { val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START) val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END) val TUMBLE_ROWTIME: SqlGroupFunction = - TUMBLE.auxiliary("TUMBLE_ROWTIME", SqlKind.OTHER_FUNCTION) + new SqlGroupFunction( + "TUMBLE_ROWTIME", + SqlKind.OTHER_FUNCTION, + TUMBLE, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + TUMBLE.getOperandTypeChecker) val TUMBLE_PROCTIME: SqlGroupFunction = TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION) @@ -461,7 +467,14 @@ object BasicOperatorTable { } val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START) val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END) - val HOP_ROWTIME: SqlGroupFunction = HOP.auxiliary("HOP_ROWTIME", SqlKind.OTHER_FUNCTION) + val HOP_ROWTIME: SqlGroupFunction = + new SqlGroupFunction( + "HOP_ROWTIME", + SqlKind.OTHER_FUNCTION, + HOP, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + HOP.getOperandTypeChecker) val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", SqlKind.OTHER_FUNCTION) val SESSION: SqlGroupFunction = new SqlGroupFunction( @@ -478,7 +491,13 @@ object BasicOperatorTable { val SESSION_START: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_START) val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END) val SESSION_ROWTIME: SqlGroupFunction = - SESSION.auxiliary("SESSION_ROWTIME", SqlKind.OTHER_FUNCTION) + new SqlGroupFunction( + "SESSION_ROWTIME", + SqlKind.OTHER_FUNCTION, + SESSION, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + SESSION.getOperandTypeChecker) val SESSION_PROCTIME: SqlGroupFunction = SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index cb31866..59aee9f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.sql import java.sql.Timestamp import org.apache.flink.api.scala._ -import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical._ @@ -63,6 +62,7 @@ class GroupWindowTest extends TableTestBase { "SELECT " + " TUMBLE_START(ts, INTERVAL '4' MINUTE), " + " TUMBLE_END(ts, INTERVAL '4' MINUTE), " + + " TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE), " + " c, " + " SUM(a) AS sumA, " + " MIN(b) AS minB " + @@ -78,9 +78,10 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "c"), term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, c, sumA, minB") + term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, " + + "w$rowtime AS EXPR$2, c, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -149,6 +150,7 @@ class GroupWindowTest extends TableTestBase { " c, " + " HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + " HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + + " HOP_ROWTIME(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + " SUM(a) AS sumA, " + " AVG(b) AS avgB " + "FROM T " + @@ -164,9 +166,10 @@ class GroupWindowTest extends TableTestBase { term("window", SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)), term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, sumA, avgB") + term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, " + + "w$rowtime AS EXPR$3, sumA, avgB") ) util.verifySql(sqlQuery, expected) @@ -205,6 +208,7 @@ class GroupWindowTest extends TableTestBase { " c, d, " + " SESSION_START(ts, INTERVAL '12' HOUR), " + " SESSION_END(ts, INTERVAL '12' HOUR), " + + " SESSION_ROWTIME(ts, INTERVAL '12' HOUR), " + " SUM(a) AS sumA, " + " MIN(b) AS minB " + "FROM T " + @@ -219,9 +223,10 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "c, d"), term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)), term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, sumA, minB") + term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, " + + "w$rowtime AS EXPR$4, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -250,7 +255,7 @@ class GroupWindowTest extends TableTestBase { ), term("groupBy", "c"), term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), - term("select", "c, start('w$) AS w$start, end('w$) AS w$end") + term("select", "c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), term("select", "CAST(w$end) AS EXPR$0") ) @@ -288,7 +293,8 @@ class GroupWindowTest extends TableTestBase { "COUNT(*) AS EXPR$0", "SUM(a) AS $f1", "start('w$) AS w$start", - "end('w$) AS w$end") + "end('w$) AS w$end, " + + "rowtime('w$) AS w$rowtime") ), term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"), term("where", http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala index cbf3029..e32b0a9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.batch.sql.validation import java.sql.Timestamp import org.apache.flink.api.scala._ -import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge} +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.utils.TableTestBase @@ -80,21 +80,6 @@ class GroupWindowValidationTest extends TableTestBase { util.verifySql(sql, "n/a") } - @Test(expected = classOf[TableException]) - def testWindowRowtime(): Unit = { - val util = batchTestUtil() - util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts) - - val sqlQuery = - "SELECT " + - " TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE)" + - "FROM T " + - "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c" - - // should fail because ROWTIME properties are not yet supported in batch - util.verifySql(sqlQuery, "FAIL") - } - @Test(expected = classOf[ValidationException]) def testWindowProctime(): Unit = { val util = batchTestUtil() http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala index 6a2f1a7..ad44e09 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.api.batch.table +import java.sql.Timestamp + import org.apache.flink.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ @@ -146,6 +148,50 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + @Test + def testLongEventTimeTumblingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.hours on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeTumblingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.hours on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + //=============================================================================================== // Sliding Windows //=============================================================================================== @@ -268,6 +314,50 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + @Test + def testLongEventTimeSlidingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Slide over 1.hour every 10.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeSlidingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Slide over 1.hour every 10.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + //=============================================================================================== // Session Windows //=============================================================================================== @@ -315,4 +405,48 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + + @Test + def testLongEventTimeSessionGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Session withGap 30.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeSessionGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Session withGap 30.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index aa934c6..b105ec02 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -18,8 +18,7 @@ package org.apache.flink.table.runtime.batch.sql -import java.sql.Timestamp - +import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.table.api.TableEnvironment @@ -312,7 +311,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -341,10 +340,10 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = Seq( "1,{1=1}", "2,{2=1}", "2,{2=1}", @@ -358,6 +357,41 @@ class AggregateITCase( } @Test + def testTumbleWindowWithProperties(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT b, COUNT(a), " + + "TUMBLE_START(ts, INTERVAL '5' SECOND), " + + "TUMBLE_END(ts, INTERVAL '5' SECOND), " + + "TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND)" + + "FROM T " + + "GROUP BY b, TUMBLE(ts, INTERVAL '5' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // min time unit is seconds + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,3,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,4,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,4,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + + @Test def testHopWindowAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -365,8 +399,6 @@ class AggregateITCase( tEnv.registerFunction("countFun", new CountAggFunction) tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset) - env.setParallelism(1) - val sqlQuery = "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" + "FROM T " + @@ -374,7 +406,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -391,6 +423,48 @@ class AggregateITCase( } @Test + def testHopWindowWithProperties(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT b, COUNT(a), " + + "HOP_START(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " + + "HOP_END(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " + + "HOP_ROWTIME(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + + "FROM T " + + "GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // create timestamps + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "1,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "2,2,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "3,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "4,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "5,5,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,4,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999", + "6,6,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999", + "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:30.0,1970-01-01 00:00:29.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + + @Test def testSessionWindowAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -398,8 +472,6 @@ class AggregateITCase( tEnv.registerFunction("countFun", new CountAggFunction) tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset) - env.setParallelism(1) - val sqlQuery = "SELECT MIN(a), MAX(a), SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), " + "wAvgWithMergeAndReset(a, a)" + @@ -409,7 +481,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps .filter(x => (x._2 % 2) == 0) - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -420,4 +492,33 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testSessionWindowWithProperties(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT COUNT(a), " + + "SESSION_START(ts, INTERVAL '4' SECOND), " + + "SESSION_END(ts, INTERVAL '4' SECOND), " + + "SESSION_ROWTIME(ts, INTERVAL '4' SECOND) " + + "FROM T " + + "GROUP BY SESSION(ts, INTERVAL '4' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // create timestamps + .filter(x => (x._2 % 2) == 0) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "6,1970-01-01 00:00:02.0,1970-01-01 00:00:14.0,1970-01-01 00:00:13.999", + "6,1970-01-01 00:00:16.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala index 718bc5c..3d9223e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala @@ -22,7 +22,7 @@ import java.math.BigDecimal import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode @@ -101,14 +101,15 @@ class GroupWindowITCase( val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) .groupBy('w, 'string) - .select('string, 'int.sum, 'w.start, 'w.end) + .select('string, 'int.sum, 'w.start, 'w.end, 'w.rowtime) - val expected = "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" + - "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + val expected = + "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" + + "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) @@ -126,11 +127,12 @@ class GroupWindowITCase( val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) .groupBy('w) - .select('int.sum, 'w.start, 'w.end) + .select('int.sum, 'w.start, 'w.end, 'w.rowtime) - val expected = "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" + val expected = + "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) @@ -147,15 +149,16 @@ class GroupWindowITCase( val windowedTable = table .window(Session withGap 7.milli on 'long as 'w) .groupBy('string, 'w) - .select('string, 'string.count, 'w.start, 'w.end) + .select('string, 'string.count, 'w.start, 'w.end, 'w.rowtime) val results = windowedTable.toDataSet[Row].collect() - val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" + - "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" + - "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" + - "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" + - "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + val expected = + "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" + + "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015,1970-01-01 00:00:00.014\n" + + "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023,1970-01-01 00:00:00.022\n" + + "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014,1970-01-01 00:00:00.013\n" + + "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008,1970-01-01 00:00:00.007" TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -167,18 +170,20 @@ class GroupWindowITCase( .fromCollection(data) .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) - val results =table + val results = table .window(Session withGap 2.milli on 'long as 'w) .groupBy('w) - .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect() + .select('string.count, 'w.start, 'w.end, 'w.rowtime) + .toDataSet[Row].collect() - val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" + - "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" + - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018" + val expected = + "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006,1970-01-01 00:00:00.005\n" + + "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018,1970-01-01 00:00:00.017" TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) + @Test def testMultiGroupWindow(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -186,14 +191,24 @@ class GroupWindowITCase( val table = env .fromCollection(data) .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) - table - .window(Tumble over 5.milli on 'long as 'w) + + val results = table + .window(Tumble over 2.milli on 'long as 'w) .groupBy('w, 'string) - .select('string, 'int.count) - .window( Slide over 5.milli every 1.milli on 'int as 'w2) - .groupBy('w2) - .select('string) - .toDataSet[Row] + .select('string, 'int.count as 'cnt, 'w.rowtime as 'time) + .window(Tumble over 6.milli on 'time as 'w2) + .groupBy('w2, 'string) + .select('string, 'cnt.sum as 'cnt, 'w2.end) + .toDataSet[Row].collect() + + val expected = + "Hallo,1,1970-01-01 00:00:00.006\n" + + "Hello world,1,1970-01-01 00:00:00.012\n" + + "Hello world,1,1970-01-01 00:00:00.018\n" + + "Hello,1,1970-01-01 00:00:00.012\n" + + "Hello,2,1970-01-01 00:00:00.006\n" + + "Hi,1,1970-01-01 00:00:00.006\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) } // ---------------------------------------------------------------------------------------------- @@ -230,18 +245,18 @@ class GroupWindowITCase( val windowedTable = table .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) - .select('int.count, 'w.start, 'w.end) + .select('int.count, 'w.start, 'w.end, 'w.rowtime) val expected = - "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" + - "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" + - "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" + - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" + - "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" + - "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" + - "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" + - "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" + - "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005" + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012\n" + + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016\n" + + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02\n" + + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002\n" + + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01\n" + + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006\n" + + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" + + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 32e3724..c49af5c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -418,7 +418,9 @@ class SqlITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row] + val resultHopStartEndWithHaving = tEnv + .sqlQuery(sqlQueryHopStartEndWithHaving) + .toAppendStream[Row] resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row]) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index a9d4e44..1eebeee 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -254,25 +254,25 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { val windowedTable = table .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) - .select('int.count, 'w.start, 'w.end) + .select('int.count, 'w.start, 'w.end, 'w.rowtime) val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = Seq( - "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013", - "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017", - "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019", - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021", - "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003", - "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011", - "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007", - "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", - "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", - "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033", - "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035", - "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037") + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012", + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016", + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018", + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02", + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002", + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01", + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006", + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008", + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004", + "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033,1970-01-01 00:00:00.032", + "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035,1970-01-01 00:00:00.034", + "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037,1970-01-01 00:00:00.036") assertEquals(expected.sorted, StreamITCase.testResults.sorted) }