flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/5] flink git commit: [FLINK-4691] [table] Add group-windows for streaming tables to Table API.
Date Wed, 26 Oct 2016 21:11:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 738953d..d81f3a1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -19,11 +19,10 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.table.Row
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.apache.flink.util.Preconditions
+import org.apache.flink.util.{Collector, Preconditions}
 
 import scala.collection.JavaConversions._
 
@@ -41,7 +40,8 @@ class AggregateReduceGroupFunction(
     private val aggregates: Array[Aggregate[_ <: Any]],
     private val groupKeysMapping: Array[(Int, Int)],
     private val aggregateMapping: Array[(Int, Int)],
-    private val intermediateRowArity: Int)
+    private val intermediateRowArity: Int,
+    private val finalRowArity: Int)
     extends RichGroupReduceFunction[Row, Row] {
 
   private var aggregateBuffer: Row = _
@@ -50,9 +50,8 @@ class AggregateReduceGroupFunction(
   override def open(config: Configuration) {
     Preconditions.checkNotNull(aggregates)
     Preconditions.checkNotNull(groupKeysMapping)
-    val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
     aggregateBuffer = new Row(intermediateRowArity)
-    output = new Row(finalRowLength)
+    output = new Row(finalRowArity)
   }
 
   /**
@@ -77,13 +76,13 @@ class AggregateReduceGroupFunction(
     })
 
     // Set group keys value to final output.
-    groupKeysMapping.map {
+    groupKeysMapping.foreach {
       case (after, previous) =>
         output.setField(after, last.productElement(previous))
     }
 
     // Evaluate final aggregate value and set to output.
-    aggregateMapping.map {
+    aggregateMapping.foreach {
       case (after, previous) =>
         output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
     }
@@ -91,96 +90,3 @@ class AggregateReduceGroupFunction(
     out.collect(output)
   }
 }
-
-/**
- * It wraps the aggregate logic inside of 
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and 
- * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
- *
- * @param aggregates   The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
- *                         and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- *                         index in output Row.
- */
-class AggregateReduceCombineFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val intermediateRowArity: Int)
-    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
-
-  private var aggregateBuffer: Row = _
-  private var output: Row = _
-
-  override def open(config: Configuration): Unit = {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
-    aggregateBuffer = new Row(intermediateRowArity)
-    output = new Row(finalRowLength)
-  }
-
-  /**
-   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
-   * calculate aggregated values output by aggregate buffer, and set them into output 
-   * Row based on the mapping relation between intermediate aggregate Row and output Row.
-   *
-   * @param records  Grouped intermediate aggregate Rows iterator.
-   * @param out The collector to hand results to.
-   *
-   */
-  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
-    // Initiate intermediate aggregate value.
-    aggregates.foreach(_.initiate(aggregateBuffer))
-
-    // Merge intermediate aggregate value to buffer.
-    var last: Row = null
-    records.foreach((record) => {
-      aggregates.foreach(_.merge(record, aggregateBuffer))
-      last = record
-    })
-
-    // Set group keys value to final output.
-    groupKeysMapping.map {
-      case (after, previous) =>
-        output.setField(after, last.productElement(previous))
-    }
-
-    // Evaluate final aggregate value and set to output.
-    aggregateMapping.map {
-      case (after, previous) =>
-        output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
-    }
-
-    out.collect(output)
-  }
-
-  /**
-   * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
-   *
-   * @param records  Sub-grouped intermediate aggregate Rows iterator.
-   * @return Combined intermediate aggregate Row.
-   *
-   */
-  override def combine(records: Iterable[Row]): Row = {
-
-    // Initiate intermediate aggregate value.
-    aggregates.foreach(_.initiate(aggregateBuffer))
-
-    // Merge intermediate aggregate value to buffer.
-    var last: Row = null
-    records.foreach((record) => {
-      aggregates.foreach(_.merge(record, aggregateBuffer))
-      last = record
-    })
-
-    // Set group keys to aggregateBuffer.
-    for (i <- 0 until groupKeysMapping.length) {
-      aggregateBuffer.setField(i, last.productElement(i))
-    }
-
-    aggregateBuffer
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 65d12c3..903cc07 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -21,14 +21,14 @@ import java.util
 
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
 import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
+import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig, TableException}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -61,10 +61,12 @@ object AggregateUtil {
    * }}}
    *
    */
-  def createOperatorFunctionsForAggregates(namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType, outputType: RelDataType,
-      groupings: Array[Int],
-      config: TableConfig): (MapFunction[Any, Row], GroupReduceFunction[Row, Row] ) = {
+  def createOperatorFunctionsForAggregates(
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int])
+    : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
     val aggregateFunctionsAndFieldIndexes =
       transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length)
@@ -98,12 +100,20 @@ object AggregateUtil {
 
     val reduceGroupFunction =
       if (allPartialAggregate) {
-        new AggregateReduceCombineFunction(aggregates, groupingOffsetMapping,
-          aggOffsetMapping, intermediateRowArity)
+        new AggregateReduceCombineFunction(
+          aggregates,
+          groupingOffsetMapping,
+          aggOffsetMapping,
+          intermediateRowArity,
+          outputType.getFieldCount)
       }
       else {
-        new AggregateReduceGroupFunction(aggregates, groupingOffsetMapping,
-          aggOffsetMapping, intermediateRowArity)
+        new AggregateReduceGroupFunction(
+          aggregates,
+          groupingOffsetMapping,
+          aggOffsetMapping,
+          intermediateRowArity,
+          outputType.getFieldCount)
       }
 
     (mapFunction, reduceGroupFunction)
@@ -181,7 +191,7 @@ object AggregateUtil {
           }
         }
         case sqlMinMaxFunction: SqlMinMaxAggFunction => {
-          aggregates(index) = if (sqlMinMaxFunction.isMin) {
+          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
             sqlTypeName match {
               case TINYINT =>
                 new ByteMinAggregate
@@ -305,7 +315,7 @@ object AggregateUtil {
               // input data, so if inputIndex is not -1, it must be a group key. Then we can
               // find the field index in buffer data by the group keys index mapping between
               // input data and buffer data.
-              for (i <- 0 until groupKeys.length) {
+              for (i <- groupKeys.indices) {
                 if (inputIndex == groupKeys(i)) {
                   groupingOffsetMapping += ((outputIndex, i))
                 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
new file mode 100644
index 0000000..c65ac35
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+class AggregateWindowFunction(
+    propertyReads: Array[WindowPropertyRead[_ <: Any]],
+    groupReduceFunction: RichGroupReduceFunction[Row, Row])
+  extends RichWindowFunction[Row, Row, Tuple, Window] {
+
+  private var propertyCollector: PropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    groupReduceFunction.open(parameters)
+    propertyCollector = new PropertyCollector(propertyReads)
+  }
+
+  override def apply(
+      key: Tuple,
+      window: Window,
+      input: Iterable[Row],
+      out: Collector[Row])
+    : Unit = {
+
+    // extract the properties from window
+    propertyReads.foreach(_.extract(window))
+
+    // set final collector
+    propertyCollector.finalCollector = out
+
+    // call wrapped reduce function with property collector
+    groupReduceFunction.reduce(input, propertyCollector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
new file mode 100644
index 0000000..763ed0b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Adds properties to the end of a row before it emits it to the final collector.
+  * The collector assumes that the row has placeholders at the end that can be filled.
+  */
+class PropertyCollector(properties: Array[WindowPropertyRead[_ <: Any]]) extends Collector[Row] {
+  var finalCollector: Collector[Row] = _
+
+  override def collect(record: Row): Unit = {
+    var i: Int = 0
+    while (i < properties.length) {
+      val idx = record.productArity - properties.length + i
+      record.setField(idx, properties(i).get())
+      i = i + 1
+    }
+    finalCollector.collect(record)
+  }
+
+  override def close(): Unit = finalCollector.close()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
new file mode 100644
index 0000000..dd12238
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+
+class WindowEndRead extends WindowPropertyRead[Timestamp] {
+
+  private var ts: Timestamp = _
+
+  override def extract(window: Window): Unit = window match {
+    case timeWindow: TimeWindow =>
+      ts = SqlFunctions.internalToTimestamp(timeWindow.getEnd)
+    case _ =>
+      ts = null
+  }
+
+  override def get(): Timestamp = ts
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
new file mode 100644
index 0000000..88ea30d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be extracted once and
+  * can be read multiple times.
+  */
+trait WindowPropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
+
+  def get(): T
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
new file mode 100644
index 0000000..d92cb18
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+
+class WindowStartRead extends WindowPropertyRead[Timestamp] {
+
+  private var ts: Timestamp = _
+
+  override def extract(window: Window): Unit = window match {
+    case timeWindow: TimeWindow =>
+      ts = SqlFunctions.internalToTimestamp(timeWindow.getStart)
+    case _ =>
+      ts = null
+  }
+
+  override def get(): Timestamp = ts
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 9d96780..ebb4dcb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.table
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, UnresolvedAlias, Expression, Ordering}
-import org.apache.flink.api.table.plan.RexNodeTranslator._
+import org.apache.flink.api.table.expressions.{Asc, Expression, ExpressionParser, Ordering}
+import org.apache.flink.api.table.plan.ProjectionTranslator._
 import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.sinks.TableSink
 
@@ -76,7 +76,7 @@ class Table(
     * }}}
     */
   def select(fields: Expression*): Table = {
-    val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv))
+    val projectionOnAggregates = fields.map(extractAggregationsAndProperties(_, tableEnv))
     val aggregations = projectionOnAggregates.flatMap(_._2)
     val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan)
     if (aggregations.nonEmpty) {
@@ -201,9 +201,6 @@ class Table(
     * }}}
     */
   def groupBy(fields: Expression*): GroupedTable = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new ValidationException(s"Group by on stream tables is currently not supported.")
-    }
     new GroupedTable(this, fields)
   }
 
@@ -632,6 +629,28 @@ class Table(
     // emit the table to the configured table sink
     tableEnv.writeToSink(this, configuredSink)
   }
+
+  /**
+    * Groups the records of a table by assigning them to windows defined by a time or row interval.
+    *
+    * For streaming tables of infinite size, grouping into windows is required to define finite
+    * groups on which group-based aggregates can be computed.
+    *
+    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+    * groupBy.
+    *
+    * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
+    * will be processed by a single operator.
+    *
+    * @param groupWindow group-window that specifies how elements are grouped.
+    * @return A windowed table.
+    */
+  def window(groupWindow: GroupWindow): GroupWindowedTable = {
+    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException(s"Windows on batch tables are currently not supported.")
+    }
+    new GroupWindowedTable(this, Seq(), groupWindow)
+  }
 }
 
 /**
@@ -653,19 +672,25 @@ class GroupedTable(
     */
   def select(fields: Expression*): Table = {
 
-    val projectionOnAggregates = fields.map(extractAggregations(_, table.tableEnv))
-    val aggregations = projectionOnAggregates.flatMap(_._2)
+    val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv))
+    val aggregations = projectionOnAggsAndProps.flatMap(_._2)
+    val properties = projectionOnAggsAndProps.flatMap(_._3)
 
-    val logical = if (aggregations.nonEmpty) {
-      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
-        Aggregate(groupKey, aggregations, table.logicalPlan).validate(table.tableEnv)
-      )
-    } else {
-      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
-        Aggregate(groupKey, Nil, table.logicalPlan).validate(table.tableEnv))
+    if (properties.nonEmpty) {
+      throw ValidationException("Window properties can only be used on windowed tables.")
     }
 
-    new Table(table.tableEnv, logical.validate(table.tableEnv))
+    val logical =
+      Project(
+        projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)),
+        Aggregate(
+          groupKey,
+          aggregations,
+          table.logicalPlan
+        ).validate(table.tableEnv)
+      ).validate(table.tableEnv)
+
+    new Table(table.tableEnv, logical)
   }
 
   /**
@@ -675,11 +700,84 @@ class GroupedTable(
     * Example:
     *
     * {{{
-    *   tab.groupBy("key").select("key, value.avg + " The average" as average")
+    *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
     * }}}
     */
   def select(fields: String): Table = {
     val fieldExprs = ExpressionParser.parseExpressionList(fields)
     select(fieldExprs: _*)
   }
+
+  /**
+    * Groups the records of a table by assigning them to windows defined by a time or row interval.
+    *
+    * For streaming tables of infinite size, grouping into windows is required to define finite
+    * groups on which group-based aggregates can be computed.
+    *
+    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
+    * groupBy.
+    *
+    * @param groupWindow group-window that specifies how elements are grouped.
+    * @return A windowed table.
+    */
+  def window(groupWindow: GroupWindow): GroupWindowedTable = {
+    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException(s"Windows on batch tables are currently not supported.")
+    }
+    new GroupWindowedTable(table, groupKey, groupWindow)
+  }
+}
+
+class GroupWindowedTable(
+    private[flink] val table: Table,
+    private[flink] val groupKey: Seq[Expression],
+    private[flink] val window: GroupWindow) {
+
+  /**
+    * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
+    * }}}
+    */
+  def select(fields: Expression*): Table = {
+    val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv))
+    val aggregations = projectionOnAggsAndProps.flatMap(_._2)
+    val properties = projectionOnAggsAndProps.flatMap(_._3)
+
+    val groupWindow = window.toLogicalWindow
+
+    val logical =
+      Project(
+        projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)),
+        WindowAggregate(
+          groupKey,
+          groupWindow,
+          properties,
+          aggregations,
+          table.logicalPlan
+        ).validate(table.tableEnv)
+      ).validate(table.tableEnv)
+
+    new Table(table.tableEnv, logical)
+  }
+
+  /**
+    * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
+    * The field expressions can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
+    * }}}
+    */
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
new file mode 100644
index 0000000..2f896ec
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for internal types of the Table API that are for translation purposes only
+  * and should not be contained in final plan.
+  */
+@SerialVersionUID(-13064574364925255L)
+abstract class InternalTypeInfo[T](val clazz: Class[T])
+  extends TypeInformation[T]
+  with AtomicType[T] {
+
+  checkNotNull(clazz)
+
+  override def isBasicType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def isTupleType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getArity: Int =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getTotalFields: Int =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getTypeClass: Class[T] = clazz
+
+  override def isKeyType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def createComparator(
+      sortOrderAscending: Boolean,
+      executionConfig: ExecutionConfig)
+    : TypeComparator[T] =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def hashCode: Int = Objects.hash(clazz)
+
+  def canEqual(obj: Any): Boolean
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: InternalTypeInfo[_] =>
+        other.canEqual(this) && (this.clazz eq other.clazz)
+      case _ =>
+        false
+    }
+  }
+
+  override def toString: String = s"InternalTypeInfo"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
deleted file mode 100644
index bf5cb58..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util.Objects
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.table.typeutils.IntervalTypeInfo.instantiateComparator
-import org.apache.flink.util.Preconditions._
-
-/**
-  * TypeInformation for SQL INTERVAL types.
-  */
-@SerialVersionUID(-1816179424364825258L)
-class IntervalTypeInfo[T](
-    val clazz: Class[T],
-    val serializer: TypeSerializer[T],
-    val comparatorClass: Class[_ <: TypeComparator[T]])
-  extends TypeInformation[T]
-  with AtomicType[T] {
-
-  checkNotNull(clazz)
-  checkNotNull(serializer)
-  checkNotNull(comparatorClass)
-
-  override def isBasicType: Boolean = false
-
-  override def isTupleType: Boolean = false
-
-  override def getArity: Int = 1
-
-  override def getTotalFields: Int = 1
-
-  override def getTypeClass: Class[T] = clazz
-
-  override def isKeyType: Boolean = true
-
-  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
-
-  override def createComparator(
-      sortOrderAscending: Boolean,
-      executionConfig: ExecutionConfig)
-    : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
-
-  // ----------------------------------------------------------------------------------------------
-
-  override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
-
-  def canEqual(obj: Any): Boolean = obj.isInstanceOf[IntervalTypeInfo[_]]
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case other: IntervalTypeInfo[_] =>
-        other.canEqual(this) &&
-          (this.clazz eq other.clazz) &&
-          serializer == other.serializer &&
-          (this.comparatorClass eq other.comparatorClass)
-      case _ =>
-        false
-    }
-  }
-
-  override def toString: String = s"IntervalTypeInfo(${clazz.getSimpleName})"
-}
-
-object IntervalTypeInfo {
-
-  val INTERVAL_MONTHS =
-    new IntervalTypeInfo(classOf[java.lang.Integer], IntSerializer.INSTANCE, classOf[IntComparator])
-
-  val INTERVAL_MILLIS =
-    new IntervalTypeInfo(classOf[java.lang.Long], LongSerializer.INSTANCE, classOf[LongComparator])
-
-  // ----------------------------------------------------------------------------------------------
-
-  private def instantiateComparator[X](
-      comparatorClass: Class[_ <: TypeComparator[X]],
-      ascendingOrder: java.lang.Boolean)
-    : TypeComparator[X] = {
-    try {
-      val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
-      constructor.newInstance(ascendingOrder)
-    } catch {
-      case e: Exception =>
-        throw new RuntimeException(
-          s"Could not initialize comparator ${comparatorClass.getName}", e)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
new file mode 100644
index 0000000..4dc83d0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo extends InternalTypeInfo[Long](classOf[Long]) {
+
+  def canEqual(obj: Any): Boolean = obj.isInstanceOf[RowIntervalTypeInfo]
+
+  override def toString: String = s"RowIntervalTypeInfo"
+}
+
+object RowIntervalTypeInfo {
+
+  val INTERVAL_ROWS = new RowIntervalTypeInfo()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
new file mode 100644
index 0000000..b516745
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for SQL INTERVAL types.
+  */
+@SerialVersionUID(-1816179424364825258L)
+class TimeIntervalTypeInfo[T](
+    val clazz: Class[T],
+    val serializer: TypeSerializer[T],
+    val comparatorClass: Class[_ <: TypeComparator[T]])
+  extends TypeInformation[T]
+  with AtomicType[T] {
+
+  checkNotNull(clazz)
+  checkNotNull(serializer)
+  checkNotNull(comparatorClass)
+
+  override def isBasicType: Boolean = false
+
+  override def isTupleType: Boolean = false
+
+  override def getArity: Int = 1
+
+  override def getTotalFields: Int = 1
+
+  override def getTypeClass: Class[T] = clazz
+
+  override def isKeyType: Boolean = true
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
+
+  override def createComparator(
+      sortOrderAscending: Boolean,
+      executionConfig: ExecutionConfig)
+    : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
+
+  def canEqual(obj: Any): Boolean = obj.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: TimeIntervalTypeInfo[_] =>
+        other.canEqual(this) &&
+          (this.clazz eq other.clazz) &&
+          serializer == other.serializer &&
+          (this.comparatorClass eq other.comparatorClass)
+      case _ =>
+        false
+    }
+  }
+
+  override def toString: String = s"TimeIntervalTypeInfo(${clazz.getSimpleName})"
+}
+
+object TimeIntervalTypeInfo {
+
+  val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
+    classOf[java.lang.Integer],
+    IntSerializer.INSTANCE,
+    classOf[IntComparator])
+
+  val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
+    classOf[java.lang.Long],
+    LongSerializer.INSTANCE,
+    classOf[LongComparator])
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def instantiateComparator[X](
+      comparatorClass: Class[_ <: TypeComparator[X]],
+      ascendingOrder: java.lang.Boolean)
+    : TypeComparator[X] = {
+    try {
+      val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
+      constructor.newInstance(ascendingOrder)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(
+          s"Could not initialize comparator ${comparatorClass.getName}", e)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
index 4ff7888..aa8614b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
@@ -30,7 +30,7 @@ object TypeCheckUtils {
   def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: BasicTypeInfo[_] => false
     case _: SqlTimeTypeInfo[_] => false
-    case _: IntervalTypeInfo[_] => false
+    case _: TimeIntervalTypeInfo[_] => false
     case _ => true
   }
 
@@ -53,7 +53,7 @@ object TypeCheckUtils {
     dataType.isInstanceOf[SqlTimeTypeInfo[_]]
 
   def isTimeInterval(dataType: TypeInformation[_]): Boolean =
-    dataType.isInstanceOf[IntervalTypeInfo[_]]
+    dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
 
   def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
 
@@ -67,7 +67,7 @@ object TypeCheckUtils {
   def assertNumericExpr(
       dataType: TypeInformation[_],
       caller: String)
-    : ExprValidationResult = dataType match {
+    : ValidationResult = dataType match {
     case _: NumericTypeInfo[_] =>
       ValidationSuccess
     case BIG_DEC_TYPE_INFO =>
@@ -76,7 +76,7 @@ object TypeCheckUtils {
       ValidationFailure(s"$caller requires numeric types, get $dataType here")
   }
 
-  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ExprValidationResult = {
+  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
     if (dataType.isSortKeyType) {
       ValidationSuccess
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
index bb3d060..23154a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
@@ -45,8 +45,8 @@ object TypeCoercion {
       case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
       case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
 
-      case (stti: SqlTimeTypeInfo[_], _: IntervalTypeInfo[_]) => Some(stti)
-      case (_: IntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
+      case (stti: SqlTimeTypeInfo[_], _: TimeIntervalTypeInfo[_]) => Some(stti)
+      case (_: TimeIntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
 
       case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
         val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
@@ -103,8 +103,8 @@ object TypeCoercion {
     case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
     case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
     case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
-    case (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) => true
-    case (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) => true
+    case (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
+    case (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
 
     case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
     case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
@@ -113,8 +113,8 @@ object TypeCoercion {
     case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
     case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
 
-    case (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
-    case (IntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
+    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
+    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
 
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
deleted file mode 100644
index 8571051..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.validate
-
-/**
-  * Represents the result of `Expression.validateInput`.
-  */
-sealed trait ExprValidationResult {
-  def isFailure: Boolean = !isSuccess
-  def isSuccess: Boolean
-}
-
-/**
-  * Represents the successful result of `Expression.checkInputDataTypes`.
-  */
-object ValidationSuccess extends ExprValidationResult {
-  val isSuccess: Boolean = true
-}
-
-/**
-  * Represents the failing result of `Expression.checkInputDataTypes`,
-  * with a error message to show the reason of failure.
-  */
-case class ValidationFailure(message: String) extends ExprValidationResult {
-  val isSuccess: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
new file mode 100644
index 0000000..5cc7d03
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.validate
+
+/**
+  * Represents the result of a validation.
+  */
+sealed trait ValidationResult {
+  def isFailure: Boolean = !isSuccess
+  def isSuccess: Boolean
+
+  /**
+    * Allows constructing a cascade of validation results.
+    * The first failure result will be returned.
+    */
+  def orElse(other: ValidationResult): ValidationResult = {
+    if (isSuccess) {
+      other
+    } else {
+      this
+    }
+  }
+}
+
+/**
+  * Represents the successful result of a validation.
+  */
+object ValidationSuccess extends ValidationResult {
+  val isSuccess: Boolean = true
+}
+
+/**
+  * Represents the failing result of a validation,
+  * with a error message to show the reason of failure.
+  */
+case class ValidationFailure(message: String) extends ValidationResult {
+  val isSuccess: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
new file mode 100644
index 0000000..32d67d7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * A group-window specification.
+  *
+  * Group-windows group rows based on time or row-count intervals and is therefore essentially a
+  * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
+  * on groups of elements.
+  *
+  * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
+  * is required to apply aggregations on streaming tables.
+  *
+  * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
+  *
+  */
+trait GroupWindow {
+
+  /**
+    * Converts an API class to a logical window for planning.
+    */
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables. For batch table it defines the
+  *                  time attribute on which is grouped.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): EventTimeWindow = {
+    this.name = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
+}
+
+// ------------------------------------------------------------------------------------------------
+// Tumbling group-windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Tumbling group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+    * Tumbling group-window.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * @param size the size of the window either as time or row-count interval.
+    */
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a tumbling group-window on event-time
+    */
+  def on(timeField: Expression): TumblingEventTimeWindow =
+    new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a tumbling group-window on event-time
+    */
+  def on(timeField: String): TumblingEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): TumblingWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeTumblingGroupWindow(alias, size)
+}
+
+/**
+  * Tumbling group-window on event-time.
+  */
+class TumblingEventTimeWindow(
+    alias: Option[Expression],
+    time: Expression,
+    size: Expression)
+  extends EventTimeWindow(time) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Sliding group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Partially specified sliding window.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class SlideWithSize(size: Expression) {
+
+  /**
+    * Partially specified sliding window.
+    *
+    * @param size the size of the window either as time or row-count interval.
+    */
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  /**
+    * Specifies the window's slide as time or row-count interval.
+    *
+    * The slide determines the interval in which windows are started. Hence, sliding windows can
+    * overlap if the slide is smaller than the size of the window.
+    *
+    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+    * windows.
+    *
+    * @param slide the slide of the window either as time or row-count interval.
+    * @return a sliding group-window
+    */
+  def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
+
+  /**
+    * Specifies the window's slide as time or row-count interval.
+    *
+    * The slide determines the interval in which windows are started. Hence, sliding windows can
+    * overlap if the slide is smaller than the size of the window.
+    *
+    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+    * windows.
+    *
+    * @param slide the slide of the window either as time or row-count interval.
+    * @return a sliding group-window
+    */
+  def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
+}
+
+/**
+  * Sliding group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param size the size of the window either as time or row-count interval.
+  */
+class SlidingWindow(
+    size: Expression,
+    slide: Expression)
+  extends GroupWindow {
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a sliding group-window on event-time
+    */
+  def on(timeField: Expression): SlidingEventTimeWindow =
+    new SlidingEventTimeWindow(alias, timeField, size, slide)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a sliding group-window on event-time
+    */
+  def on(timeField: String): SlidingEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): SlidingWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeSlidingGroupWindow(alias, size, slide)
+}
+
+/**
+  * Sliding group-window on event-time.
+  */
+class SlidingEventTimeWindow(
+    alias: Option[Expression],
+    timeField: Expression,
+    size: Expression,
+    slide: Expression)
+  extends EventTimeWindow(timeField) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
+}
+
+// ------------------------------------------------------------------------------------------------
+// Session group windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Session group-window.
+  *
+  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
+  * grouped by processing-time.
+  *
+  * @param gap the time interval of inactivity before a window is closed.
+  */
+class SessionWindow(gap: Expression) extends GroupWindow {
+
+  /**
+    * Session group-window.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * @param gap the time interval of inactivity before a window is closed.
+    */
+  def this(gap: String) = this(ExpressionParser.parseExpression(gap))
+
+  private var alias: Option[Expression] = None
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a session group-window on event-time
+    */
+  def on(timeField: Expression): SessionEventTimeWindow =
+    new SessionEventTimeWindow(alias, timeField, gap)
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
+    * are grouped by processing-time.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for batch tables
+    * @return a session group-window on event-time
+    */
+  def on(timeField: String): SessionEventTimeWindow =
+    on(ExpressionParser.parseExpression(timeField))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: Expression): SessionWindow = {
+    this.alias = Some(alias)
+    this
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to in order
+    * to access window properties such as window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    ProcessingTimeSessionGroupWindow(alias, gap)
+}
+
+/**
+  * Session group-window on event-time.
+  */
+class SessionEventTimeWindow(
+    alias: Option[Expression],
+    timeField: Expression,
+    gap: Expression)
+  extends EventTimeWindow(timeField) {
+
+  override private[flink] def toLogicalWindow: LogicalWindow =
+    EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 2d662d6..294cba2 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -118,7 +118,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		tableEnv.registerDataSet("MyTable", ds2);
 	}
 
-	@Test(expected = ValidationException.class)
+	@Test(expected = TableException.class)
 	public void testScanUnregisteredTable() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -146,7 +146,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
-	@Test(expected = ValidationException.class)
+	@Test(expected = TableException.class)
 	public void testIllegalName() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -157,7 +157,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		tableEnv.registerTable("_DataSetTable_42", t);
 	}
 
-	@Test(expected = ValidationException.class)
+	@Test(expected = TableException.class)
 	public void testRegisterTableFromOtherEnv() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
index 2d82dbc..4c07615 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
@@ -91,7 +91,7 @@ class TableEnvironmentITCase(
     tEnv.registerDataSet("MyTable", ds2)
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test(expected = classOf[TableException])
   def testScanUnregisteredTable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -132,7 +132,7 @@ class TableEnvironmentITCase(
     tEnv.registerDataSet("MyTable", t2)
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test(expected = classOf[TableException])
   def testRegisterTableFromOtherEnv(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv1 = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
index 8a2612e..c3758a4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Assert._
@@ -163,9 +163,9 @@ class CalcITCase(
 
     try {
       CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("ValidationException expected")
+      fail("TableException expected")
     } catch {
-      case _: ValidationException => //ignore
+      case _: TableException => //ignore
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
new file mode 100644
index 0000000..6bb513e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * We only test some aggregations until better testing of constructed DataStream
+  * programs is possible.
+  */
+class AggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, "Hi"),
+    (2L, 2, "Hello"),
+    (4L, 2, "Hello"),
+    (8L, 3, "Hello world"),
+    (16L, 3, "Hello world"))
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows)
+      .select('string, 'int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1", "Hello world,2", "Hello,1", "Hello,2", "Hi,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows)
+      .select('int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("2", "2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object GroupWindowITCase {
+  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}


Mime
View raw message