flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [5/5] flink git commit: [FLINK-4691] [table] Rework window property extraction.
Date Wed, 26 Oct 2016 21:11:27 GMT
[FLINK-4691] [table] Rework window property extraction.

- Deduplicate aggregations and window properties in Table API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de03e0ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de03e0ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de03e0ce

Branch: refs/heads/master
Commit: de03e0cea16006f04b5f62d3ff70f583f5db9e4f
Parents: 44f3977
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Oct 26 01:08:43 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Oct 26 23:03:53 2016 +0200

----------------------------------------------------------------------
 .../api/table/plan/ProjectionTranslator.scala   | 152 +++++++++++++------
 .../nodes/datastream/DataStreamAggregate.scala  |  82 ++++++++--
 .../AggregateAllTimeWindowFunction.scala        |  53 +++++++
 .../aggregate/AggregateAllWindowFunction.scala  |  19 +--
 .../aggregate/AggregateTimeWindowFunction.scala |  57 +++++++
 .../aggregate/AggregateWindowFunction.scala     |  19 +--
 .../runtime/aggregate/PropertyCollector.scala   |  42 -----
 .../aggregate/TimeWindowPropertyCollector.scala |  54 +++++++
 .../table/runtime/aggregate/WindowEndRead.scala |  38 -----
 .../runtime/aggregate/WindowPropertyRead.scala  |  33 ----
 .../runtime/aggregate/WindowStartRead.scala     |  38 -----
 .../org/apache/flink/api/table/table.scala      |  40 ++---
 .../scala/stream/table/AggregationsITCase.scala |  18 +--
 .../scala/stream/table/GroupWindowTest.scala    |  48 +++++-
 14 files changed, 424 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
index 2299bd1..d09b03e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -27,78 +27,144 @@ import scala.collection.mutable.ListBuffer
 object ProjectionTranslator {
 
   /**
-    * Extracts all aggregation and property expressions (zero, one, or more) from an expression,
-    * and replaces the original expressions by field accesses expressions.
+    * Extracts and deduplicates all aggregation and window property expressions (zero, one,
or more)
+    * from all expressions and replaces the original expressions by field accesses expressions.
+    *
+    * @param exprs a list of expressions to convert
+    * @param tableEnv the TableEnvironment
+    * @return a Tuple3, the first field contains the converted expressions, the second field
the
+    *         extracted and deduplicated aggregations, and the third field the extracted
and
+    *         deduplicated window properties.
     */
   def extractAggregationsAndProperties(
-      exp: Expression,
+      exprs: Seq[Expression],
       tableEnv: TableEnvironment)
-    : (Expression, List[NamedExpression], List[NamedExpression]) = {
+  : (Seq[NamedExpression], Seq[NamedExpression], Seq[NamedExpression]) = {
+
+    val (aggNames, propNames) =
+      exprs.foldLeft( (Map[Expression, String](), Map[Expression, String]()) ) {
+        (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
+      }
+
+    val replaced = exprs
+      .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
+      .map {
+        case e: Expression => UnresolvedAlias(e)
+      }
+    val aggs = aggNames.map( a => Alias(a._1, a._2)).toSeq
+    val props = propNames.map( p => Alias(p._1, p._2)).toSeq
+
+    (replaced, aggs, props)
+  }
+
+  /** Identifies and deduplicates aggregation functions and window properties. */
+  private def identifyAggregationsAndProperties(
+      exp: Expression,
+      tableEnv: TableEnvironment,
+      aggNames: Map[Expression, String],
+      propNames: Map[Expression, String]) : (Map[Expression, String], Map[Expression, String])
= {
+
+    exp match {
+      case agg: Aggregation =>
+        if (aggNames contains agg) {
+          (aggNames, propNames)
+        } else {
+          (aggNames + (agg -> tableEnv.createUniqueAttributeName()), propNames)
+        }
+      case prop: WindowProperty =>
+        if (propNames contains prop) {
+          (aggNames, propNames)
+        } else {
+          (aggNames, propNames + (prop -> tableEnv.createUniqueAttributeName()))
+        }
+      case l: LeafExpression =>
+        (aggNames, propNames)
+      case u: UnaryExpression =>
+        identifyAggregationsAndProperties(u.child, tableEnv, aggNames, propNames)
+      case b: BinaryExpression =>
+        val l = identifyAggregationsAndProperties(b.left, tableEnv, aggNames, propNames)
+        identifyAggregationsAndProperties(b.right, tableEnv, l._1, l._2)
+
+      // Functions calls
+      case c @ Call(name, args) =>
+        args.foldLeft((aggNames, propNames)){
+          (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
+        }
+
+      case sfc @ ScalarFunctionCall(clazz, args) =>
+        args.foldLeft((aggNames, propNames)){
+          (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
+        }
+
+      // General expression
+      case e: Expression =>
+        e.productIterator.foldLeft((aggNames, propNames)){
+          (x, y) => y match {
+            case e: Expression => identifyAggregationsAndProperties(e, tableEnv, x._1,
x._2)
+            case _ => (x._1, x._2)
+          }
+        }
+    }
+  }
+
+  /** Replaces aggregations and projections by named field references. */
+  private def replaceAggregationsAndProperties(
+      exp: Expression,
+      tableEnv: TableEnvironment,
+      aggNames: Map[Expression, String],
+      propNames: Map[Expression, String]) : Expression = {
 
     exp match {
       case agg: Aggregation =>
-        val name = tableEnv.createUniqueAttributeName()
-        val aggCall = Alias(agg, name)
-        val fieldExp = UnresolvedFieldReference(name)
-        (fieldExp, List(aggCall), Nil)
+        val name = aggNames(agg)
+        Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName())
       case prop: WindowProperty =>
-        val name = tableEnv.createUniqueAttributeName()
-        val propCall = Alias(prop, name)
-        val fieldExp = UnresolvedFieldReference(name)
-        (fieldExp, Nil, List(propCall))
+        val name = propNames(prop)
+        Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName())
       case n @ Alias(agg: Aggregation, name) =>
-        val fieldExp = UnresolvedFieldReference(name)
-        (fieldExp, List(n), Nil)
+        val aName = aggNames(agg)
+        Alias(UnresolvedFieldReference(aName), name)
       case n @ Alias(prop: WindowProperty, name) =>
-        val fieldExp = UnresolvedFieldReference(name)
-        (fieldExp, Nil, List(n))
-      case l: LeafExpression =>
-        (l, Nil, Nil)
+        val pName = propNames(prop)
+        Alias(UnresolvedFieldReference(pName), name)
+      case l: LeafExpression => l
       case u: UnaryExpression =>
-        val c = extractAggregationsAndProperties(u.child, tableEnv)
-        (u.makeCopy(Array(c._1)), c._2, c._3)
+        val c = replaceAggregationsAndProperties(u.child, tableEnv, aggNames, propNames)
+        u.makeCopy(Array(c))
       case b: BinaryExpression =>
-        val l = extractAggregationsAndProperties(b.left, tableEnv)
-        val r = extractAggregationsAndProperties(b.right, tableEnv)
-        (b.makeCopy(Array(l._1, r._1)),
-          l._2 ::: r._2,
-          l._3 ::: r._3)
+        val l = replaceAggregationsAndProperties(b.left, tableEnv, aggNames, propNames)
+        val r = replaceAggregationsAndProperties(b.right, tableEnv, aggNames, propNames)
+        b.makeCopy(Array(l, r))
 
       // Functions calls
       case c @ Call(name, args) =>
-        val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv))
-        (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray),
-          newArgs.flatMap(_._2).toList,
-          newArgs.flatMap(_._3).toList)
+        val newArgs = args.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
+        c.makeCopy(Array(name, newArgs))
 
       case sfc @ ScalarFunctionCall(clazz, args) =>
-        val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv))
-        (sfc.makeCopy((clazz :: newArgs.map(_._1) :: Nil).toArray),
-          newArgs.flatMap(_._2).toList,
-          newArgs.flatMap(_._3).toList)
+        val newArgs: Seq[Expression] = args
+          .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
+        sfc.makeCopy(Array(clazz,newArgs))
 
       // General expression
       case e: Expression =>
         val newArgs = e.productIterator.map {
           case arg: Expression =>
-            extractAggregationsAndProperties(arg, tableEnv)
+            replaceAggregationsAndProperties(arg, tableEnv, aggNames, propNames)
         }
-        (e.makeCopy(newArgs.map(_._1).toArray),
-          newArgs.flatMap(_._2).toList,
-          newArgs.flatMap(_._3).toList)
+        e.makeCopy(newArgs.toArray)
     }
   }
 
   /**
-    * Parses all input expressions to [[UnresolvedAlias]].
-    * And expands star to parent's full project list.
+    * Expands an UnresolvedFieldReference("*") to parent's full project list.
     */
-  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression]
= {
-    val projectList = new ListBuffer[NamedExpression]
+  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[Expression] = {
+    val projectList = new ListBuffer[Expression]
     exprs.foreach {
       case n: UnresolvedFieldReference if n.name == "*" =>
-        projectList ++= parent.output.map(UnresolvedAlias(_))
-      case e: Expression => projectList += UnresolvedAlias(e)
+        projectList ++= parent.output.map(a => UnresolvedFieldReference(a.name))
+      case e: Expression => projectList += e
     }
     projectList
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
index b9b4561..b4ae3ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,19 +22,21 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.plan.nodes.FlinkAggregate
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
createNonKeyedWindowedStream, transformToPropertyReads}
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.api.table.runtime.aggregate._
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo,
TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment}
+import org.apache.flink.api.table.{TableException, FlinkTypeFactory, Row, StreamTableEnvironment}
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream,
WindowedStream}
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -115,8 +117,6 @@ class DataStreamAggregate(
       getRowType,
       grouping)
 
-    val propertyReads = transformToPropertyReads(namedProperties.map(_.property))
-
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
       tableEnv,
       // tell the input operator that this operator currently only supports Rows as input
@@ -148,7 +148,8 @@ class DataStreamAggregate(
         val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
           s"window: ($window), " +
           s"select: ($aggString)"
-        val aggregateFunction = new AggregateWindowFunction(propertyReads, groupReduceFunction)
+        val aggregateFunction =
+          createWindowAggregationFunction(window, namedProperties, groupReduceFunction)
 
         val keyedStream = mappedInput.keyBy(groupingKeys: _*)
 
@@ -164,7 +165,8 @@ class DataStreamAggregate(
       // global / non-keyed aggregation
       else {
         val aggOpName = s"window: ($window), select: ($aggString)"
-        val aggregateFunction = new AggregateAllWindowFunction(propertyReads, groupReduceFunction)
+        val aggregateFunction =
+          createAllWindowAggregationFunction(window, namedProperties, groupReduceFunction)
 
         val windowedStream = createNonKeyedWindowedStream(window, mappedInput)
           .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
@@ -197,11 +199,69 @@ class DataStreamAggregate(
 
 object DataStreamAggregate {
 
-  private def transformToPropertyReads(namedProperties: Seq[WindowProperty])
-    : Array[WindowPropertyRead[_ <: Any]] =  namedProperties.map {
-      case WindowStart(_) => new WindowStartRead()
-      case WindowEnd(_) => new WindowEndRead()
-    }.toArray
+  private def createAllWindowAggregationFunction(
+      window: LogicalWindow,
+      properties: Seq[NamedWindowProperty],
+      aggFunction: RichGroupReduceFunction[Row, Row])
+    : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+    } else {
+      new AggregateAllWindowFunction(aggFunction)
+    }
+
+  }
+
+  private def createWindowAggregationFunction(
+      window: LogicalWindow,
+      properties: Seq[NamedWindowProperty],
+      aggFunction: RichGroupReduceFunction[Row, Row])
+    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+    } else {
+      new AggregateWindowFunction(aggFunction)
+    }
+
+  }
+
+  private def isTimeWindow(window: LogicalWindow) = {
+    window match {
+      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
+      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
+      case ProcessingTimeSessionGroupWindow(_, _) => true
+      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
+      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
+      case EventTimeSessionGroupWindow(_, _, _) => true
+    }
+  }
+
+  def computeWindowStartEndPropertyPos(properties: Seq[NamedWindowProperty])
+      : (Option[Int], Option[Int]) = {
+
+    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
+      (p, x) => p match {
+        case NamedWindowProperty(name, prop) =>
+          prop match {
+            case WindowStart(_) if x._1.isDefined =>
+              throw new TableException("Duplicate WindowStart property encountered. This
is a bug.")
+            case WindowStart(_) =>
+              (Some(x._3), x._2, x._3 - 1)
+            case WindowEnd(_) if x._2.isDefined =>
+              throw new TableException("Duplicate WindowEnd property encountered. This is
a bug.")
+            case WindowEnd(_) =>
+              (x._1, Some(x._3), x._3 - 1)
+          }
+      }
+    }
+    (propPos._1, propPos._2)
+  }
 
   private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row,
Tuple])
     : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
new file mode 100644
index 0000000..ceadfe7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+class AggregateAllTimeWindowFunction(
+    groupReduceFunction: RichGroupReduceFunction[Row, Row],
+    windowStartPos: Option[Int],
+    windowEndPos: Option[Int])
+
+  extends RichAllWindowFunction[Row, Row, TimeWindow] {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    groupReduceFunction.open(parameters)
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+  }
+
+  override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit
= {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    // call wrapped reduce function with property collector
+    groupReduceFunction.reduce(input, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
index 86f8a20..53ab948 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -27,27 +27,14 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-class AggregateAllWindowFunction(
-    propertyReads: Array[WindowPropertyRead[_ <: Any]],
-    groupReduceFunction: RichGroupReduceFunction[Row, Row])
-  extends RichAllWindowFunction[Row, Row, Window] {
-
-  private var propertyCollector: PropertyCollector = _
+class AggregateAllWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row])
+    extends RichAllWindowFunction[Row, Row, Window] {
 
   override def open(parameters: Configuration): Unit = {
     groupReduceFunction.open(parameters)
-    propertyCollector = new PropertyCollector(propertyReads)
   }
 
   override def apply(window: Window, input: Iterable[Row], out: Collector[Row]): Unit = {
-
-    // extract the properties from window
-    propertyReads.foreach(_.extract(window))
-
-    // set final collector
-    propertyCollector.finalCollector = out
-
-    // call wrapped reduce function with property collector
-    groupReduceFunction.reduce(input, propertyCollector)
+    groupReduceFunction.reduce(input, out)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
new file mode 100644
index 0000000..80f52ca
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+class AggregateTimeWindowFunction(
+    groupReduceFunction: RichGroupReduceFunction[Row, Row],
+    windowStartPos: Option[Int],
+    windowEndPos: Option[Int])
+  extends RichWindowFunction[Row, Row, Tuple, TimeWindow] {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    groupReduceFunction.open(parameters)
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+  }
+
+  override def apply(
+      key: Tuple,
+      window: TimeWindow,
+      input: Iterable[Row],
+      out: Collector[Row]) : Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    // call wrapped reduce function with property collector
+    groupReduceFunction.reduce(input, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
index c65ac35..180248f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -28,32 +28,19 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-class AggregateWindowFunction(
-    propertyReads: Array[WindowPropertyRead[_ <: Any]],
-    groupReduceFunction: RichGroupReduceFunction[Row, Row])
+class AggregateWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row])
   extends RichWindowFunction[Row, Row, Tuple, Window] {
 
-  private var propertyCollector: PropertyCollector = _
-
   override def open(parameters: Configuration): Unit = {
     groupReduceFunction.open(parameters)
-    propertyCollector = new PropertyCollector(propertyReads)
   }
 
   override def apply(
       key: Tuple,
       window: Window,
       input: Iterable[Row],
-      out: Collector[Row])
-    : Unit = {
-
-    // extract the properties from window
-    propertyReads.foreach(_.extract(window))
-
-    // set final collector
-    propertyCollector.finalCollector = out
+      out: Collector[Row]) : Unit = {
 
-    // call wrapped reduce function with property collector
-    groupReduceFunction.reduce(input, propertyCollector)
+    groupReduceFunction.reduce(input, out)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
deleted file mode 100644
index 763ed0b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.util.Collector
-
-/**
-  * Adds properties to the end of a row before it emits it to the final collector.
-  * The collector assumes that the row has placeholders at the end that can be filled.
-  */
-class PropertyCollector(properties: Array[WindowPropertyRead[_ <: Any]]) extends Collector[Row]
{
-  var finalCollector: Collector[Row] = _
-
-  override def collect(record: Row): Unit = {
-    var i: Int = 0
-    while (i < properties.length) {
-      val idx = record.productArity - properties.length + i
-      record.setField(idx, properties(i).get())
-      i = i + 1
-    }
-    finalCollector.collect(record)
-  }
-
-  override def close(): Unit = finalCollector.close()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
new file mode 100644
index 0000000..9f1c23b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.table.Row
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Adds TimeWindow properties to specified fields of a row before it emits the row to a
wrapped
+  * collector.
+  */
+class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
+    extends Collector[Row] {
+
+  var wrappedCollector: Collector[Row] = _
+  var timeWindow: TimeWindow = _
+
+  override def collect(record: Row): Unit = {
+
+    val lastFieldPos = record.productArity - 1
+
+    if (windowStartOffset.isDefined) {
+      record.setField(
+        lastFieldPos + windowStartOffset.get,
+        SqlFunctions.internalToTimestamp(timeWindow.getStart))
+    }
+    if (windowEndOffset.isDefined) {
+      record.setField(
+        lastFieldPos + windowEndOffset.get,
+        SqlFunctions.internalToTimestamp(timeWindow.getEnd))
+    }
+    wrappedCollector.collect(record)
+  }
+
+  override def close(): Unit = wrappedCollector.close()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
deleted file mode 100644
index dd12238..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.sql.Timestamp
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
-
-class WindowEndRead extends WindowPropertyRead[Timestamp] {
-
-  private var ts: Timestamp = _
-
-  override def extract(window: Window): Unit = window match {
-    case timeWindow: TimeWindow =>
-      ts = SqlFunctions.internalToTimestamp(timeWindow.getEnd)
-    case _ =>
-      ts = null
-  }
-
-  override def get(): Timestamp = ts
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
deleted file mode 100644
index 88ea30d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.streaming.api.windowing.windows.Window
-
-/**
-  * Base class for reading a window property. The property will be extracted once and
-  * can be read multiple times.
-  */
-trait WindowPropertyRead[T] extends Serializable {
-
-  def extract(window: Window): Unit
-
-  def get(): T
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
deleted file mode 100644
index d92cb18..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.sql.Timestamp
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
-
-class WindowStartRead extends WindowPropertyRead[Timestamp] {
-
-  private var ts: Timestamp = _
-
-  override def extract(window: Window): Unit = window match {
-    case timeWindow: TimeWindow =>
-      ts = SqlFunctions.internalToTimestamp(timeWindow.getStart)
-    case _ =>
-      ts = null
-  }
-
-  override def get(): Timestamp = ts
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index ebb4dcb..8528c8a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -76,16 +76,21 @@ class Table(
     * }}}
     */
   def select(fields: Expression*): Table = {
-    val projectionOnAggregates = fields.map(extractAggregationsAndProperties(_, tableEnv))
-    val aggregations = projectionOnAggregates.flatMap(_._2)
-    val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan)
-    if (aggregations.nonEmpty) {
+
+    val expandedFields = expandProjectList(fields, logicalPlan)
+    val (projection, aggs, props) = extractAggregationsAndProperties(expandedFields, tableEnv)
+
+    if (props.nonEmpty) {
+      throw ValidationException("Window properties can only be used on windowed tables.")
+    }
+
+    if (aggs.nonEmpty) {
       new Table(tableEnv,
-        Project(projectList,
-          Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv))
+        Project(projection,
+          Aggregate(Nil, aggs, logicalPlan).validate(tableEnv)).validate(tableEnv))
     } else {
       new Table(tableEnv,
-        Project(projectList, logicalPlan).validate(tableEnv))
+        Project(projection, logicalPlan).validate(tableEnv))
     }
   }
 
@@ -672,20 +677,18 @@ class GroupedTable(
     */
   def select(fields: Expression*): Table = {
 
-    val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv))
-    val aggregations = projectionOnAggsAndProps.flatMap(_._2)
-    val properties = projectionOnAggsAndProps.flatMap(_._3)
+    val (projection, aggs, props) = extractAggregationsAndProperties(fields, table.tableEnv)
 
-    if (properties.nonEmpty) {
+    if (props.nonEmpty) {
       throw ValidationException("Window properties can only be used on windowed tables.")
     }
 
     val logical =
       Project(
-        projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)),
+        projection,
         Aggregate(
           groupKey,
-          aggregations,
+          aggs,
           table.logicalPlan
         ).validate(table.tableEnv)
       ).validate(table.tableEnv)
@@ -744,20 +747,19 @@ class GroupWindowedTable(
     * }}}
     */
   def select(fields: Expression*): Table = {
-    val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv))
-    val aggregations = projectionOnAggsAndProps.flatMap(_._2)
-    val properties = projectionOnAggsAndProps.flatMap(_._3)
+
+    val (projection, aggs, props) = extractAggregationsAndProperties(fields, table.tableEnv)
 
     val groupWindow = window.toLogicalWindow
 
     val logical =
       Project(
-        projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)),
+        projection,
         WindowAggregate(
           groupKey,
           groupWindow,
-          properties,
-          aggregations,
+          props,
+          aggs,
           table.logicalPlan
         ).validate(table.tableEnv)
       ).validate(table.tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
index 6bb513e..2ccbb38 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
@@ -160,21 +160,21 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val windowedTable = table
       .groupBy('string)
       .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
 
     val results = windowedTable.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
     val expected = Seq(
-      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
-      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
-      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
-      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
index 96fd787..b59b151 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
@@ -586,7 +586,7 @@ class GroupWindowTest extends TableTestBase {
     val windowedTable = table
       .groupBy('string)
       .window(Session withGap 3.milli on 'rowtime as 'w)
-      .select('w.end, 'string, 'int.count, 'w.start, 'w.end)
+      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -603,10 +603,50 @@ class GroupWindowTest extends TableTestBase {
           "string",
           "COUNT(int) AS TMP_1",
           "end(WindowReference(w)) AS TMP_0",
-          "start(WindowReference(w)) AS TMP_2",
-          "end(WindowReference(w)) AS TMP_3")
+          "start(WindowReference(w)) AS TMP_2")
       ),
-      term("select", "TMP_0", "string", "TMP_1", "TMP_2", "TMP_3")
+      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.millis on 'rowtime as 'w)
+      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start
as 'x2,
+        'w.end as 'x3, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window",
+          EventTimeTumblingGroupWindow(
+            Some(WindowReference("w")),
+            RowtimeAttribute(),
+            5.millis)),
+        term("select",
+          "string",
+          "SUM(int) AS TMP_0",
+          "start(WindowReference(w)) AS TMP_1",
+          "end(WindowReference(w)) AS TMP_2")
+      ),
+      term("select",
+        "string",
+        "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
+        "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
+        "TMP_1 AS x",
+        "TMP_1 AS x2",
+        "TMP_2 AS x3",
+        "TMP_2 AS TMP_5")
     )
 
     util.verifyTable(windowedTable, expected)


Mime
View raw message