flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/3] flink git commit: [FLINK-6697] [table] Add support for group window ROWTIME to batch SQL & Table API.
Date Wed, 25 Oct 2017 20:02:16 GMT
[FLINK-6697] [table] Add support for group window ROWTIME to batch SQL & Table API.

- Fixes [FLINK-7542] "AggregateITCase fails in different timezone."

This closes #4796.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/babee277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/babee277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/babee277

Branch: refs/heads/master
Commit: babee277204b6b1edcfe9c7c76348254019b2dd3
Parents: 14e0948
Author: Fabian Hueske <fhueske@apache.org>
Authored: Sun Aug 6 23:55:56 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Oct 25 22:01:39 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/tableApi.md                      |  16 +--
 .../calcite/sql/fun/SqlGroupFunction.java       |  20 +++
 .../table/expressions/fieldExpression.scala     |  25 +++-
 .../rules/common/WindowPropertiesRule.scala     |  70 ++++++----
 .../DataSetLogicalWindowAggregateRule.scala     |   7 +-
 .../DataStreamLogicalWindowAggregateRule.scala  |   7 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  12 +-
 ...SetSessionWindowAggReduceGroupFunction.scala |   8 +-
 ...SetSlideWindowAggReduceCombineFunction.scala |   4 +
 ...taSetSlideWindowAggReduceGroupFunction.scala |  14 +-
 ...mbleTimeWindowAggReduceCombineFunction.scala |   4 +
 ...TumbleTimeWindowAggReduceGroupFunction.scala |  10 +-
 ...rementalAggregateAllTimeWindowFunction.scala |   4 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   4 +-
 .../aggregate/TimeWindowPropertyCollector.scala |  18 ++-
 .../flink/table/validate/FunctionCatalog.scala  |  27 +++-
 .../table/api/batch/sql/GroupWindowTest.scala   |  24 ++--
 .../validation/GroupWindowValidationTest.scala  |  17 +--
 .../table/api/batch/table/GroupWindowTest.scala | 134 +++++++++++++++++++
 .../runtime/batch/sql/AggregateITCase.scala     | 123 +++++++++++++++--
 .../runtime/batch/table/GroupWindowITCase.scala |  97 ++++++++------
 .../table/runtime/stream/sql/SqlITCase.scala    |   4 +-
 .../stream/table/GroupWindowITCase.scala        |  26 ++--
 23 files changed, 511 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f0c4605..2b45b5a 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -344,7 +344,7 @@ Table orders = tableEnv.scan("Orders");
 Table result = orders
     .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
     .groupBy("a, w") // group by key and window
-    .select("a, w.start, w.end, b.sum as d"); // access window properties and aggregate
+    .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate
 {% endhighlight %}
       </td>
     </tr>
@@ -427,7 +427,7 @@ val orders: Table = tableEnv.scan("Orders")
 val result: Table = orders
     .window(Tumble over 5.minutes on 'rowtime as 'w) // define window
     .groupBy('a, 'w) // group by key and window
-    .select('a, w.start, 'w.end, 'b.sum as 'd) // access window properties and aggregate
+    .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate
 {% endhighlight %}
       </td>
     </tr>
@@ -1178,7 +1178,7 @@ val table = input
 </div>
 </div>
 
-Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively.
+Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and uppper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29:59.999` as rowtime timestamp, and `14:30:00.000` as end timestamp.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1186,7 +1186,7 @@ Window properties such as the start and end timestamp of a time window can be ad
 Table table = input
   .window([Window w].as("w"))  // define window with alias w
   .groupBy("w, a")  // group the table by attribute a and window w 
-  .select("a, w.start, w.end, b.count"); // aggregate and add window start and end timestamps
+  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
 {% endhighlight %}
 </div>
 
@@ -1195,7 +1195,7 @@ Table table = input
 val table = input
   .window([w: Window] as 'w)  // define window with alias w
   .groupBy('w, 'a)  // group the table by attribute a and window w 
-  .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps
+  .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
 {% endhighlight %}
 </div>
 </div>
@@ -1227,7 +1227,7 @@ Tumbling windows are defined by using the `Tumble` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>
@@ -1289,7 +1289,7 @@ Sliding windows are defined by using the `Slide` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>
@@ -1347,7 +1347,7 @@ A session window is defined by using the `Session` class as follows:
     </tr>
     <tr>
       <td><code>as</code></td>
-      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td>
+      <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td>
     </tr>
   </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
index fd5ddf9..0bb26da 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java
@@ -22,6 +22,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 
 import com.google.common.collect.ImmutableList;
@@ -84,6 +85,25 @@ public class SqlGroupFunction extends SqlFunction {
     this(kind.name(), kind, groupFunction, operandTypeChecker);
   }
 
+	/** Creates a SqlGroupFunction.
+	 *
+	 * @param name Function name
+	 * @param kind Kind
+	 * @param groupFunction Group function, if this is an auxiliary;
+	 *                      null, if this is a group function
+	 * @param returnTypeInference Inference of the functions return type
+	 * @param operandTypeChecker Operand type checker
+	 */
+  public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction groupFunction,
+      SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker) {
+    super(name, kind, returnTypeInference, null, operandTypeChecker,
+      SqlFunctionCategory.SYSTEM);
+    this.groupFunction = groupFunction;
+    if (groupFunction != null) {
+      assert groupFunction.groupFunction == null;
+    }
+  }
+
   /** Creates an auxiliary function from this grouped window function.
    *
    * @param kind Kind; also determines function name

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index dab9ce3..bad5889 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, UnresolvedException, ValidationException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType}
+import org.apache.flink.table.calcite.FlinkTypeFactory._
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
@@ -163,9 +163,13 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
   override private[flink] def validateInput(): ValidationResult = {
     child match {
-      case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) =>
+      case WindowReference(_, Some(tpe)) if isProctimeIndicatorType(tpe) =>
         ValidationFailure("A proctime window cannot provide a rowtime attribute.")
       case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+        // rowtime window
+        ValidationSuccess
+      case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+        // batch time window
         ValidationSuccess
       case WindowReference(_, _) =>
         ValidationFailure("Reference to a rowtime or proctime window required.")
@@ -175,8 +179,19 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
     }
   }
 
-  override def resultType: TypeInformation[_] =
-    TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+  override def resultType: TypeInformation[_] = {
+    child match {
+      case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+        // rowtime window
+        TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+        // batch time window
+        Types.SQL_TIMESTAMP
+      case _ =>
+        throw TableException("WindowReference of RowtimeAttribute has invalid type. " +
+          "Please report this bug.")
+    }
+  }
 
   override def toNamedWindowProperty(name: String): NamedWindowProperty =
     NamedWindowProperty(name, this)

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
index c228528..74f7b1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical.LogicalWindow
@@ -59,23 +59,24 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa
       agg: LogicalWindowAggregate): RelNode = {
 
     val w = agg.getWindow
-
-    val isRowtime = ExpressionUtils.isRowtimeAttribute(w.timeAttribute)
-    val isProctime = ExpressionUtils.isProctimeAttribute(w.timeAttribute)
+    val windowType = getWindowType(w)
 
     val startEndProperties = Seq(
       NamedWindowProperty(propertyName(w, "start"), WindowStart(w.aliasAttribute)),
       NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute)))
 
     // allow rowtime/proctime for rowtime windows and proctime for proctime windows
-    val timeProperties = if (isRowtime) {
-      Seq(
-        NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)),
-        NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute)))
-    } else if (isProctime) {
-      Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute)))
-    } else {
-      Seq()
+    val timeProperties = windowType match {
+      case 'streamRowtime =>
+        Seq(
+          NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)),
+          NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute)))
+      case 'streamProctime =>
+        Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute)))
+      case 'batchRowtime =>
+        Seq(NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)))
+      case _ =>
+        throw new TableException("Unknown window type encountered. Please report this bug.")
     }
 
     val properties = startEndProperties ++ timeProperties
@@ -103,6 +104,18 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa
     builder.build()
   }
 
+  private def getWindowType(window: LogicalWindow): Symbol = {
+    if (ExpressionUtils.isRowtimeAttribute(window.timeAttribute)) {
+      'streamRowtime
+    } else if (ExpressionUtils.isProctimeAttribute(window.timeAttribute)) {
+      'streamProctime
+    } else if (window.timeAttribute.resultType == Types.SQL_TIMESTAMP) {
+      'batchRowtime
+    } else {
+      throw new TableException("Unknown window type encountered. Please report this bug.")
+    }
+  }
+
   /** Generates a property name for a window. */
   private def propertyName(window: LogicalWindow, name: String): String = {
     window.aliasAttribute.asInstanceOf[WindowReference].name + name
@@ -115,9 +128,7 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa
       builder: RelBuilder): RexNode = {
 
     val rexBuilder = builder.getRexBuilder
-
-    val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute)
-    val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute)
+    val windowType = getWindowType(window)
 
     node match {
       case c: RexCall if isWindowStart(c) =>
@@ -129,22 +140,25 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa
         rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "end")), false)
 
       case c: RexCall if isWindowRowtime(c) =>
-        if (isProctime) {
-          throw ValidationException("A proctime window cannot provide a rowtime attribute.")
-        } else if (isRowtime) {
-          // replace expression by access to window rowtime
-          builder.field(propertyName(window, "rowtime"))
-        } else {
-          throw TableException("Accessing the rowtime attribute of a window is not yet " +
-            "supported in a batch environment.")
+        windowType match {
+          case 'streamRowtime | 'batchRowtime =>
+            // replace expression by access to window rowtime
+            builder.field(propertyName(window, "rowtime"))
+          case 'streamProctime =>
+            throw ValidationException("A proctime window cannot provide a rowtime attribute.")
+          case _ =>
+            throw new TableException("Unknown window type encountered. Please report this bug.")
         }
 
       case c: RexCall if isWindowProctime(c) =>
-        if (isProctime || isRowtime) {
-          // replace expression by access to window proctime
-          builder.field(propertyName(window, "proctime"))
-        } else {
-          throw ValidationException("Proctime is not supported in a batch environment.")
+        windowType match {
+          case 'streamProctime | 'streamRowtime =>
+            // replace expression by access to window proctime
+            builder.field(propertyName(window, "proctime"))
+          case 'batchRowtime =>
+            throw ValidationException("PROCTIME window property is not supported in batch queries.")
+          case _ =>
+            throw new TableException("Unknown window type encountered. Please report this bug.")
         }
 
       case c: RexCall =>

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
index 129e0d3..346eefa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
@@ -67,23 +67,24 @@ class DataSetLogicalWindowAggregateRule
       }
     }
 
+    val timeField = getFieldReference(windowExpr.getOperands.get(0))
     windowExpr.getOperator match {
       case BasicOperatorTable.TUMBLE =>
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
+        w.on(timeField).as(WindowReference("w$", Some(timeField.resultType)))
 
       case BasicOperatorTable.HOP =>
         val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
         val w = Slide
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
+        w.on(timeField).as(WindowReference("w$", Some(timeField.resultType)))
 
       case BasicOperatorTable.SESSION =>
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-        w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$"))
+        w.on(timeField).as(WindowReference("w$", Some(timeField.resultType)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index eaad885..254f36b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -23,7 +23,6 @@ import java.math.{BigDecimal => JBigDecimal}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, ValidationException, Window}
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -89,7 +88,7 @@ class DataStreamLogicalWindowAggregateRule
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as(WindowReference("w$"))
+        w.on(time).as(WindowReference("w$", Some(time.resultType)))
 
       case BasicOperatorTable.HOP =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
@@ -98,14 +97,14 @@ class DataStreamLogicalWindowAggregateRule
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as(WindowReference("w$"))
+        w.on(time).as(WindowReference("w$", Some(time.resultType)))
 
       case BasicOperatorTable.SESSION =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as(WindowReference("w$"))
+        w.on(time).as(WindowReference("w$", Some(time.resultType)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index ce13cdc..bdfdbf5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -595,7 +595,7 @@ object AggregateUtil {
     window match {
       case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
         // tumbling time window
-        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
+        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
           // for incremental aggregations
           new DataSetTumbleTimeWindowAggReduceCombineFunction(
@@ -604,6 +604,7 @@ object AggregateUtil {
             asLong(size),
             startPos,
             endPos,
+            timePos,
             keysAndAggregatesArity)
         }
         else {
@@ -613,6 +614,7 @@ object AggregateUtil {
             asLong(size),
             startPos,
             endPos,
+            timePos,
             outputType.getFieldCount)
         }
       case TumblingGroupWindow(_, _, size) =>
@@ -622,17 +624,18 @@ object AggregateUtil {
           asLong(size))
 
       case SessionGroupWindow(_, _, gap) =>
-        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
+        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
         new DataSetSessionWindowAggReduceGroupFunction(
           genFinalAggFunction,
           keysAndAggregatesArity,
           startPos,
           endPos,
+          timePos,
           asLong(gap),
           isInputCombined)
 
       case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
-        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
+        val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
           // for partial aggregations
           new DataSetSlideWindowAggReduceCombineFunction(
@@ -641,6 +644,7 @@ object AggregateUtil {
             keysAndAggregatesArity,
             startPos,
             endPos,
+            timePos,
             asLong(size))
         }
         else {
@@ -650,6 +654,7 @@ object AggregateUtil {
             keysAndAggregatesArity,
             startPos,
             endPos,
+            timePos,
             asLong(size))
         }
 
@@ -659,6 +664,7 @@ object AggregateUtil {
             keysAndAggregatesArity,
             None,
             None,
+            None,
             asLong(size))
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index d99ca31..372fc0d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -44,6 +44,7 @@ import org.apache.flink.util.Collector
   * @param keysAndAggregatesArity    The total arity of keys and aggregates
   * @param finalRowWindowStartPos The relative window-start field position.
   * @param finalRowWindowEndPos   The relative window-end field position.
+  * @param finalRowWindowRowtimePos The relative window-rowtime field position.
   * @param gap                    Session time window gap.
   */
 class DataSetSessionWindowAggReduceGroupFunction(
@@ -51,13 +52,14 @@ class DataSetSessionWindowAggReduceGroupFunction(
     keysAndAggregatesArity: Int,
     finalRowWindowStartPos: Option[Int],
     finalRowWindowEndPos: Option[Int],
+    finalRowWindowRowtimePos: Option[Int],
     gap: Long,
     isInputCombined: Boolean)
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations]
     with Logging {
 
-  private var collector: RowTimeWindowPropertyCollector = _
+  private var collector: DataSetTimeWindowPropertyCollector = _
   private val intermediateRowWindowStartPos = keysAndAggregatesArity
   private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1
 
@@ -78,10 +80,10 @@ class DataSetSessionWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(
+    collector = new DataSetTimeWindowPropertyCollector(
       finalRowWindowStartPos,
       finalRowWindowEndPos,
-      None)
+      finalRowWindowRowtimePos)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
index 381d443..2da838f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -36,6 +36,8 @@ import org.apache.flink.types.Row
   * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last field of output row
   * @param finalRowWindowEndPos relative window-end position to last field of output row
+  * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the
+  *                                 output row
   * @param windowSize size of the window, used to determine window-end for output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
@@ -44,12 +46,14 @@ class DataSetSlideWindowAggReduceCombineFunction(
     keysAndAggregatesArity: Int,
     finalRowWindowStartPos: Option[Int],
     finalRowWindowEndPos: Option[Int],
+    finalRowWindowRowtimePos: Option[Int],
     windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
     genFinalAggregations,
     keysAndAggregatesArity,
     finalRowWindowStartPos,
     finalRowWindowEndPos,
+    finalRowWindowRowtimePos,
     windowSize)
   with CombineFunction[Row, Row] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index c64a522..474a09b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -36,6 +36,8 @@ import org.apache.flink.util.Collector
   * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last field of output row
   * @param finalRowWindowEndPos relative window-end position to last field of output row
+  * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the
+  *                                 output row
   * @param windowSize size of the window, used to determine window-end for output row
   */
 class DataSetSlideWindowAggReduceGroupFunction(
@@ -43,12 +45,13 @@ class DataSetSlideWindowAggReduceGroupFunction(
     keysAndAggregatesArity: Int,
     finalRowWindowStartPos: Option[Int],
     finalRowWindowEndPos: Option[Int],
+    finalRowWindowRowtimePos: Option[Int],
     windowSize: Long)
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations]
     with Logging {
 
-  private var collector: RowTimeWindowPropertyCollector = _
+  private var collector: DataSetTimeWindowPropertyCollector = _
   protected val windowStartPos: Int = keysAndAggregatesArity
 
   private var output: Row = _
@@ -68,10 +71,10 @@ class DataSetSlideWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(
+    collector = new DataSetTimeWindowPropertyCollector(
       finalRowWindowStartPos,
       finalRowWindowEndPos,
-      None)
+      finalRowWindowRowtimePos)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
@@ -93,7 +96,10 @@ class DataSetSlideWindowAggReduceGroupFunction(
     function.setAggregationResults(accumulators, output)
 
     // adds TimeWindow properties to output then emit output
-    if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
+    if (finalRowWindowStartPos.isDefined ||
+        finalRowWindowEndPos.isDefined ||
+        finalRowWindowRowtimePos.isDefined) {
+
       collector.wrappedCollector = out
       collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long]
       collector.windowEnd = collector.windowStart + windowSize

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index 4a459b2..9eeab33 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -37,6 +37,8 @@ import org.apache.flink.types.Row
   *                               output row
   * @param windowEndPos           The relative window-end field position to the last field of
   *                               output row
+  * @param windowRowtimePos       The relative window-rowtime field position to the last field of
+  *                               output row
   * @param keysAndAggregatesArity The total arity of keys and aggregates
   */
 class DataSetTumbleTimeWindowAggReduceCombineFunction(
@@ -45,12 +47,14 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
     windowSize: Long,
     windowStartPos: Option[Int],
     windowEndPos: Option[Int],
+    windowRowtimePos: Option[Int],
     keysAndAggregatesArity: Int)
   extends DataSetTumbleTimeWindowAggReduceGroupFunction(
     genFinalAggregations,
     windowSize,
     windowStartPos,
     windowEndPos,
+    windowRowtimePos,
     keysAndAggregatesArity)
     with CombineFunction[Row, Row] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 7ae4c17..4e92148 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -35,6 +35,8 @@ import org.apache.flink.util.Collector
   * @param windowSize       Tumbling time window size
   * @param windowStartPos   The relative window-start field position to the last field of output row
   * @param windowEndPos     The relative window-end field position to the last field of output row
+  * @param windowRowtimePos The relative window-rowtime field position to the last field of
+  *                         output row
   * @param keysAndAggregatesArity    The total arity of keys and aggregates
   */
 class DataSetTumbleTimeWindowAggReduceGroupFunction(
@@ -42,12 +44,13 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     windowSize: Long,
     windowStartPos: Option[Int],
     windowEndPos: Option[Int],
+    windowRowtimePos: Option[Int],
     keysAndAggregatesArity: Int)
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations]
     with Logging {
 
-  private var collector: RowTimeWindowPropertyCollector = _
+  private var collector: DataSetTimeWindowPropertyCollector = _
   protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1)
 
   private var output: Row = _
@@ -67,7 +70,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos, None)
+    collector = new DataSetTimeWindowPropertyCollector(
+      windowStartPos,
+      windowEndPos,
+      windowRowtimePos)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index 3c2e858..818418e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -42,10 +42,10 @@ class IncrementalAggregateAllTimeWindowFunction(
   extends IncrementalAggregateAllWindowFunction[TimeWindow](
     finalRowArity) {
 
-  private var collector: CRowTimeWindowPropertyCollector = _
+  private var collector: DataStreamTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new CRowTimeWindowPropertyCollector(
+    collector = new DataStreamTimeWindowPropertyCollector(
       windowStartOffset,
       windowEndOffset,
       windowRowtimeOffset)

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index 69e4f7b..a908f49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -48,10 +48,10 @@ class IncrementalAggregateTimeWindowFunction(
     numAggregates,
     finalRowArity) {
 
-  private var collector: CRowTimeWindowPropertyCollector = _
+  private var collector: DataStreamTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new CRowTimeWindowPropertyCollector(
+    collector = new DataStreamTimeWindowPropertyCollector(
       windowStartOffset,
       windowEndOffset,
       windowRowtimeOffset)

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 16e4a0b..e9ecf0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -40,6 +40,8 @@ abstract class TimeWindowPropertyCollector[T](
 
   def getRow(record: T): Row
 
+  def setRowtimeAttribute(pos: Int): Unit
+
   override def collect(record: T): Unit = {
 
     output = getRow(record)
@@ -57,9 +59,7 @@ abstract class TimeWindowPropertyCollector[T](
     }
 
     if (windowRowtimeOffset.isDefined) {
-      output.setField(
-        lastFieldPos + windowRowtimeOffset.get,
-        windowEnd - 1)
+      setRowtimeAttribute(lastFieldPos + windowRowtimeOffset.get)
     }
 
     wrappedCollector.collect(record)
@@ -68,20 +68,28 @@ abstract class TimeWindowPropertyCollector[T](
   override def close(): Unit = wrappedCollector.close()
 }
 
-class RowTimeWindowPropertyCollector(
+final class DataSetTimeWindowPropertyCollector(
     startOffset: Option[Int],
     endOffset: Option[Int],
     rowtimeOffset: Option[Int])
   extends TimeWindowPropertyCollector[Row](startOffset, endOffset, rowtimeOffset) {
 
   override def getRow(record: Row): Row = record
+
+  override def setRowtimeAttribute(pos: Int): Unit = {
+    output.setField(pos, SqlFunctions.internalToTimestamp(windowEnd - 1))
+  }
 }
 
-class CRowTimeWindowPropertyCollector(
+final class DataStreamTimeWindowPropertyCollector(
     startOffset: Option[Int],
     endOffset: Option[Int],
     rowtimeOffset: Option[Int])
   extends TimeWindowPropertyCollector[CRow](startOffset, endOffset, rowtimeOffset) {
 
   override def getRow(record: CRow): Row = record.row
+
+  override def setRowtimeAttribute(pos: Int): Unit = {
+    output.setField(pos, windowEnd - 1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6c6be0b..535fd6e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.validate
 
-import org.apache.calcite.sql.`type`.OperandTypes
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeTransforms}
 import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
 import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, SqlOperatorTable}
@@ -442,7 +442,13 @@ object BasicOperatorTable {
   val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START)
   val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END)
   val TUMBLE_ROWTIME: SqlGroupFunction =
-    TUMBLE.auxiliary("TUMBLE_ROWTIME", SqlKind.OTHER_FUNCTION)
+    new SqlGroupFunction(
+      "TUMBLE_ROWTIME",
+      SqlKind.OTHER_FUNCTION,
+      TUMBLE,
+      // ensure that returned rowtime is always NOT_NULLABLE
+      ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
+      TUMBLE.getOperandTypeChecker)
   val TUMBLE_PROCTIME: SqlGroupFunction =
     TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION)
 
@@ -461,7 +467,14 @@ object BasicOperatorTable {
   }
   val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START)
   val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END)
-  val HOP_ROWTIME: SqlGroupFunction = HOP.auxiliary("HOP_ROWTIME", SqlKind.OTHER_FUNCTION)
+  val HOP_ROWTIME: SqlGroupFunction =
+    new SqlGroupFunction(
+      "HOP_ROWTIME",
+      SqlKind.OTHER_FUNCTION,
+      HOP,
+      // ensure that returned rowtime is always NOT_NULLABLE
+      ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
+      HOP.getOperandTypeChecker)
   val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", SqlKind.OTHER_FUNCTION)
 
   val SESSION: SqlGroupFunction = new SqlGroupFunction(
@@ -478,7 +491,13 @@ object BasicOperatorTable {
   val SESSION_START: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_START)
   val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END)
   val SESSION_ROWTIME: SqlGroupFunction =
-    SESSION.auxiliary("SESSION_ROWTIME", SqlKind.OTHER_FUNCTION)
+    new SqlGroupFunction(
+      "SESSION_ROWTIME",
+      SqlKind.OTHER_FUNCTION,
+      SESSION,
+      // ensure that returned rowtime is always NOT_NULLABLE
+      ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE),
+      SESSION.getOperandTypeChecker)
   val SESSION_PROCTIME: SqlGroupFunction =
     SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index cb31866..59aee9f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.sql
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical._
@@ -63,6 +62,7 @@ class GroupWindowTest extends TableTestBase {
       "SELECT " +
         "  TUMBLE_START(ts, INTERVAL '4' MINUTE), " +
         "  TUMBLE_END(ts, INTERVAL '4' MINUTE), " +
+        "  TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE), " +
         "  c, " +
         "  SUM(a) AS sumA, " +
         "  MIN(b) AS minB " +
@@ -78,9 +78,10 @@ class GroupWindowTest extends TableTestBase {
           term("groupBy", "c"),
           term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
           term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
+            "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime")
         ),
-        term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, c, sumA, minB")
+        term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, " +
+          "w$rowtime AS EXPR$2, c, sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -149,6 +150,7 @@ class GroupWindowTest extends TableTestBase {
         "  c, " +
         "  HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
         "  HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
+        "  HOP_ROWTIME(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
         "  SUM(a) AS sumA, " +
         "  AVG(b) AS avgB " +
         "FROM T " +
@@ -164,9 +166,10 @@ class GroupWindowTest extends TableTestBase {
           term("window",
             SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
           term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
+            "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime")
         ),
-        term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, sumA, avgB")
+        term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, " +
+          "w$rowtime AS EXPR$3, sumA, avgB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -205,6 +208,7 @@ class GroupWindowTest extends TableTestBase {
         "  c, d, " +
         "  SESSION_START(ts, INTERVAL '12' HOUR), " +
         "  SESSION_END(ts, INTERVAL '12' HOUR), " +
+        "  SESSION_ROWTIME(ts, INTERVAL '12' HOUR), " +
         "  SUM(a) AS sumA, " +
         "  MIN(b) AS minB " +
         "FROM T " +
@@ -219,9 +223,10 @@ class GroupWindowTest extends TableTestBase {
           term("groupBy", "c, d"),
           term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)),
           term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
+            "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime")
         ),
-        term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, sumA, minB")
+        term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, " +
+          "w$rowtime AS EXPR$4, sumA, minB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -250,7 +255,7 @@ class GroupWindowTest extends TableTestBase {
           ),
           term("groupBy", "c"),
           term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
-          term("select", "c, start('w$) AS w$start, end('w$) AS w$end")
+          term("select", "c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime")
         ),
         term("select", "CAST(w$end) AS EXPR$0")
       )
@@ -288,7 +293,8 @@ class GroupWindowTest extends TableTestBase {
             "COUNT(*) AS EXPR$0",
             "SUM(a) AS $f1",
             "start('w$) AS w$start",
-            "end('w$) AS w$end")
+            "end('w$) AS w$end, " +
+            "rowtime('w$) AS w$rowtime")
         ),
         term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"),
         term("where",

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
index cbf3029..e32b0a9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.batch.sql.validation
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.utils.TableTestBase
@@ -80,21 +80,6 @@ class GroupWindowValidationTest extends TableTestBase {
     util.verifySql(sql, "n/a")
   }
 
-  @Test(expected = classOf[TableException])
-  def testWindowRowtime(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT " +
-        "  TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE)" +
-        "FROM T " +
-        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
-
-    // should fail because ROWTIME properties are not yet supported in batch
-    util.verifySql(sqlQuery, "FAIL")
-  }
-
   @Test(expected = classOf[ValidationException])
   def testWindowProctime(): Unit = {
     val util = batchTestUtil()

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index 6a2f1a7..ad44e09 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.api.batch.table
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.scala._
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
@@ -146,6 +148,50 @@ class GroupWindowTest extends TableTestBase {
     util.verifyTable(windowedTable, expected)
   }
 
+  @Test
+  def testLongEventTimeTumblingGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.hours on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTimestampEventTimeTumblingGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.hours on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
   //===============================================================================================
   // Sliding Windows
   //===============================================================================================
@@ -268,6 +314,50 @@ class GroupWindowTest extends TableTestBase {
     util.verifyTable(windowedTable, expected)
   }
 
+  @Test
+  def testLongEventTimeSlidingGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 1.hour every 10.minutes on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTimestampEventTimeSlidingGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 1.hour every 10.minutes on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
   //===============================================================================================
   // Session Windows
   //===============================================================================================
@@ -315,4 +405,48 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifyTable(windowedTable, expected)
   }
+
+  @Test
+  def testLongEventTimeSessionGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 30.minutes on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTimestampEventTimeSessionGroupWindowWithProperties(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 30.minutes on 'ts as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime)
+
+    val expected = unaryNode(
+      "DataSetWindowAggregate",
+      batchTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)),
+      term("select", "string", "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index aa934c6..b105ec02 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -18,8 +18,7 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
-import java.sql.Timestamp
-
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.TableEnvironment
@@ -312,7 +311,7 @@ class AggregateITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
       // create timestamps
-      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
     val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
@@ -341,10 +340,10 @@ class AggregateITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
       // create timestamps
-      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "1,{1=1}",
       "2,{2=1}", "2,{2=1}",
@@ -358,6 +357,41 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowWithProperties(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT b, COUNT(a), " +
+        "TUMBLE_START(ts, INTERVAL '5' SECOND), " +
+        "TUMBLE_END(ts, INTERVAL '5' SECOND), " +
+        "TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND)" +
+      "FROM T " +
+      "GROUP BY b, TUMBLE(ts, INTERVAL '5' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // min time unit is seconds
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "3,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "4,3,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999",
+      "5,4,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999",
+      "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999",
+      "6,4,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999",
+      "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
   def testHopWindowAggregate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -365,8 +399,6 @@ class AggregateITCase(
     tEnv.registerFunction("countFun", new CountAggFunction)
     tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
 
-    env.setParallelism(1)
-
     val sqlQuery =
       "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" +
         "FROM T " +
@@ -374,7 +406,7 @@ class AggregateITCase(
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
       // create timestamps
-      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
     val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
@@ -391,6 +423,48 @@ class AggregateITCase(
   }
 
   @Test
+  def testHopWindowWithProperties(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT b, COUNT(a), " +
+        "HOP_START(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " +
+        "HOP_END(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " +
+        "HOP_ROWTIME(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " +
+      "FROM T " +
+      "GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "2,2,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "3,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999",
+      "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "3,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "4,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999",
+      "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999",
+      "4,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999",
+      "5,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999",
+      "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999",
+      "5,5,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999",
+      "6,4,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999",
+      "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999",
+      "6,6,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999",
+      "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:30.0,1970-01-01 00:00:29.999"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
   def testSessionWindowAggregate(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -398,8 +472,6 @@ class AggregateITCase(
     tEnv.registerFunction("countFun", new CountAggFunction)
     tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
 
-    env.setParallelism(1)
-
     val sqlQuery =
       "SELECT MIN(a), MAX(a), SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), " +
         "wAvgWithMergeAndReset(a, a)" +
@@ -409,7 +481,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
       // create timestamps
       .filter(x => (x._2 % 2) == 0)
-      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
     val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
@@ -420,4 +492,33 @@ class AggregateITCase(
 
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
+
+  @Test
+  def testSessionWindowWithProperties(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT COUNT(a), " +
+        "SESSION_START(ts, INTERVAL '4' SECOND), " +
+        "SESSION_END(ts, INTERVAL '4' SECOND), " +
+        "SESSION_ROWTIME(ts, INTERVAL '4' SECOND) " +
+      "FROM T " +
+      "GROUP BY SESSION(ts, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .filter(x => (x._2 % 2) == 0)
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "6,1970-01-01 00:00:02.0,1970-01-01 00:00:14.0,1970-01-01 00:00:13.999",
+      "6,1970-01-01 00:00:16.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
index 718bc5c..3d9223e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
@@ -22,7 +22,7 @@ import java.math.BigDecimal
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -101,14 +101,15 @@ class GroupWindowITCase(
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w, 'string)
-      .select('string, 'int.sum, 'w.start, 'w.end)
+      .select('string, 'int.sum, 'w.start, 'w.end, 'w.rowtime)
 
-    val expected = "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" +
-      "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n"
+    val expected =
+      "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
+      "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" +
+      "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
+      "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
+      "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n"
 
     val results = windowedTable.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -126,11 +127,12 @@ class GroupWindowITCase(
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w)
-      .select('int.sum, 'w.start, 'w.end)
+      .select('int.sum, 'w.start, 'w.end, 'w.rowtime)
 
-    val expected = "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n"
+    val expected =
+      "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" +
+      "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
+      "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n"
 
     val results = windowedTable.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
@@ -147,15 +149,16 @@ class GroupWindowITCase(
     val windowedTable = table
       .window(Session withGap 7.milli on 'long as 'w)
       .groupBy('string, 'w)
-      .select('string, 'string.count, 'w.start, 'w.end)
+      .select('string, 'string.count, 'w.start, 'w.end, 'w.rowtime)
 
     val results = windowedTable.toDataSet[Row].collect()
 
-    val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" +
-      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" +
-      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" +
-      "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" +
-      "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008"
+    val expected =
+      "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" +
+      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015,1970-01-01 00:00:00.014\n" +
+      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023,1970-01-01 00:00:00.022\n" +
+      "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014,1970-01-01 00:00:00.013\n" +
+      "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008,1970-01-01 00:00:00.007"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -167,18 +170,20 @@ class GroupWindowITCase(
       .fromCollection(data)
       .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
 
-    val results =table
+    val results = table
       .window(Session withGap 2.milli on 'long as 'w)
       .groupBy('w)
-      .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect()
+      .select('string.count, 'w.start, 'w.end, 'w.rowtime)
+      .toDataSet[Row].collect()
 
-    val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" +
-      "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" +
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018"
+    val expected =
+      "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006,1970-01-01 00:00:00.005\n" +
+      "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" +
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018,1970-01-01 00:00:00.017"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testMultiGroupWindow(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -186,14 +191,24 @@ class GroupWindowITCase(
     val table = env
       .fromCollection(data)
       .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
+
+    val results = table
+      .window(Tumble over 2.milli on 'long as 'w)
       .groupBy('w, 'string)
-      .select('string, 'int.count)
-      .window( Slide over 5.milli every 1.milli on 'int as 'w2)
-      .groupBy('w2)
-      .select('string)
-      .toDataSet[Row]
+      .select('string, 'int.count as 'cnt, 'w.rowtime as 'time)
+      .window(Tumble over 6.milli on 'time as 'w2)
+      .groupBy('w2, 'string)
+      .select('string, 'cnt.sum as 'cnt, 'w2.end)
+      .toDataSet[Row].collect()
+
+    val expected =
+      "Hallo,1,1970-01-01 00:00:00.006\n" +
+      "Hello world,1,1970-01-01 00:00:00.012\n" +
+      "Hello world,1,1970-01-01 00:00:00.018\n" +
+      "Hello,1,1970-01-01 00:00:00.012\n" +
+      "Hello,2,1970-01-01 00:00:00.006\n" +
+      "Hi,1,1970-01-01 00:00:00.006\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // ----------------------------------------------------------------------------------------------
@@ -230,18 +245,18 @@ class GroupWindowITCase(
     val windowedTable = table
       .window(Slide over 5.milli every 2.milli on 'long as 'w)
       .groupBy('w)
-      .select('int.count, 'w.start, 'w.end)
+      .select('int.count, 'w.start, 'w.end, 'w.rowtime)
 
     val expected =
-      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
-      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
-      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
-      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
-      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
-      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" +
-      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
-      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012\n" +
+      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016\n" +
+      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018\n" +
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02\n" +
+      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002\n" +
+      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01\n" +
+      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006\n" +
+      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" +
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004"
 
     val results = windowedTable.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 32e3724..c49af5c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -418,7 +418,9 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+    val resultHopStartEndWithHaving = tEnv
+      .sqlQuery(sqlQueryHopStartEndWithHaving)
+      .toAppendStream[Row]
     resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
 
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index a9d4e44..1eebeee 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -254,25 +254,25 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
     val windowedTable = table
       .window(Slide over 5.milli every 2.milli on 'long as 'w)
       .groupBy('w)
-      .select('int.count, 'w.start, 'w.end)
+      .select('int.count, 'w.start, 'w.end, 'w.rowtime)
 
     val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = Seq(
-      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
-      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
-      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
-      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
-      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
-      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
-      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033",
-      "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035",
-      "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037")
+      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012",
+      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016",
+      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018",
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02",
+      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002",
+      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01",
+      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006",
+      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008",
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004",
+      "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033,1970-01-01 00:00:00.032",
+      "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035,1970-01-01 00:00:00.034",
+      "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037,1970-01-01 00:00:00.036")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 


Mime
View raw message