flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [10/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test
Date Thu, 13 Jul 2017 10:18:19 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
index a16688e..791f778 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
@@ -19,57 +19,22 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
 
 class GroupAggregationsTest extends TableTestBase {
 
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-    val overAgg = new OverAgg0
-    table.select(overAgg('a, 'b))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val ds = table
-             // must fail. '_foo is not a valid field
-             .groupBy('_foo)
-             .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val ds = table
-             .groupBy('a, 'b)
-             // must fail. 'c is not a grouping key or aggregation
-             .select('c)
-  }
-
   @Test
-  def testGroupbyWithoutWindow() = {
+  def testGroupAggregate() = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-                      .groupBy('b)
-                      .select('a.count)
+      .groupBy('b)
+      .select('a.count)
 
     val expected =
       unaryNode(
@@ -96,9 +61,9 @@ class GroupAggregationsTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-            .select('a, 4 as 'four, 'b)
-            .groupBy('four, 'a)
-            .select('four, 'b.sum)
+      .select('a, 4 as 'four, 'b)
+      .groupBy('four, 'a)
+      .select('four, 'b.sum)
 
     val expected =
       unaryNode(
@@ -124,9 +89,9 @@ class GroupAggregationsTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-            .select('b, 4 as 'four, 'a)
-            .groupBy('b, 'four)
-            .select('four, 'a.sum)
+      .select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
 
     val expected =
       unaryNode(
@@ -152,9 +117,9 @@ class GroupAggregationsTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-            .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
-            .groupBy('d)
-            .select('c.min, 'a.avg)
+      .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
+      .groupBy('d)
+      .select('c.min, 'a.avg)
 
     val expected =
       unaryNode(
@@ -180,9 +145,9 @@ class GroupAggregationsTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-            .groupBy('b)
-            .select('b, 'a.sum)
-            .where('b === 2)
+      .groupBy('b)
+      .select('b, 'a.sum)
+      .where('b === 2)
 
     val expected =
       unaryNode(
@@ -205,8 +170,8 @@ class GroupAggregationsTest extends TableTestBase {
     val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
 
     val resultTable = table
-            .groupBy('b)
-            .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
+      .groupBy('b)
+      .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
 
     val expected =
       unaryNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index d78aea6..92c8522 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.runtime.datastream.StreamITCase
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.types.Row
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
new file mode 100644
index 0000000..6be0e13
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.WindowReference
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
+import org.junit.{Ignore, Test}
+
+class GroupWindowAggregationsTest extends TableTestBase {
+
+  @Test
+  def testMultiWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.proctime as 'proctime, 'string, 'int.count)
+      .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
+      .groupBy('w2)
+      .select('string.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "string", "int", "proctime")
+          ),
+          term("groupBy", "string"),
+          term(
+            "window",
+            TumblingGroupWindow(
+              WindowReference("w1"),
+              'proctime,
+              50.milli)),
+          term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
+        ),
+        term("select", "string", "TMP_0 AS proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w2"),
+          'proctime,
+          20.milli,
+          10.milli)),
+      term("select", "COUNT(string) AS TMP_2")
+    )
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'long,
+          5.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'rowtime,
+          5.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows,
+          1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'long,
+          8.milli,
+          10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'long,
+          5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli,
+          50.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows,
+          1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+//  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term(
+        "window",
+        SessionGroupWindow(
+          WindowReference("w"),
+          'long,
+          7.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlidingWindowWithUDAF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String, Int, Int)](
+      'long,
+      'int,
+      'string,
+      'int2,
+      'int3,
+      'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          streamTableNode(0),
+          term("groupBy", "string, int2, int3"),
+          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  2.rows, 1.rows)),
+          term(
+            "select",
+            "string",
+            "int2",
+            "int3",
+            "WeightedAvg(long, int) AS TMP_0")
+        ),
+        term("select","TMP_0")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlideWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSessionWindowStartWithTwoEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 3.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
+        term("select",
+          "string",
+          "COUNT(int) AS TMP_1",
+          "end('w) AS TMP_0",
+          "start('w) AS TMP_2")
+      ),
+      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.millis on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
+        'w.end as 'x3, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
+        term("select",
+          "string",
+          "SUM(int) AS TMP_0",
+          "start('w) AS TMP_1",
+          "end('w) AS TMP_2")
+      ),
+      term("select",
+        "string",
+        "+(CAST(TMP_0), 1) AS s1",
+        "+(CAST(TMP_0), 3) AS s2",
+        "TMP_1 AS x",
+        "TMP_1 AS x2",
+        "TMP_2 AS x3",
+        "TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
deleted file mode 100644
index 593b036..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ /dev/null
@@ -1,963 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvg, WeightedAvgWithMerge}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.WindowReference
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
-import org.junit.{Ignore, Test}
-
-class GroupWindowTest extends TableTestBase {
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val overAgg = new OverAgg0
-    table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select(overAgg('long, 'int))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowProperty(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .select('string, 'string.start) // property in non windowed table
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupByWithoutWindowAlias(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowTimeRef(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
-      .groupBy('w2)
-      .select('string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTumblingSize(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      .window(Tumble over "WRONG" on 'long as 'w) // string is not a valid interval
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testTumbleUdAggWithInvalidArgs(): Unit = {
-    val util = streamTestUtil()
-    val weightedAvg = new WeightedAvgWithMerge
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 2.hours on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSlidingSize(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      .window(Slide over "WRONG" every "WRONG" on 'long as 'w) // string is not a valid interval
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSlidingSlide(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      // row and time intervals may not be mixed
-      .window(Slide over 12.rows every 1.minute on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSlideUdAggWithInvalidArgs(): Unit = {
-    val util = streamTestUtil()
-    val weightedAvg = new WeightedAvgWithMerge
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Slide over 2.hours every 30.minutes on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSessionGap(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      // row interval is not valid for session windows
-      .window(Session withGap 10.rows on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowAlias1(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      .window(Session withGap 100.milli on 'long as 1 + 1) // expression instead of a symbol
-      .groupBy('string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowAlias2(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    table
-      // field name "string" is already present
-      .window(Session withGap 100.milli on 'long as 'string)
-      .groupBy('string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSessionUdAggWithInvalidArgs(): Unit = {
-    val util = streamTestUtil()
-    val weightedAvg = new WeightedAvgWithMerge
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    table
-      .window(Session withGap 2.hours on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowPropertyOnRowCountsTumblingWindow(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    table
-    .window(Tumble over 2.rows on 'proctime as 'w)
-    .groupBy('w, 'string)
-    .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowPropertyOnRowCountsSlidingWindow(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    table
-    .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
-    .groupBy('w, 'string)
-    .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
-  }
-
-  @Test
-  def testMultiWindow(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w1)
-      .groupBy('w1, 'string)
-      .select('w1.proctime as 'proctime, 'string, 'int.count)
-      .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
-      .groupBy('w2)
-      .select('string.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "string", "int", "proctime")
-          ),
-          term("groupBy", "string"),
-          term(
-            "window",
-            TumblingGroupWindow(
-              WindowReference("w1"),
-              'proctime,
-              50.milli)),
-          term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
-        ),
-        term("select", "string", "TMP_0 AS proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w2"),
-          'proctime,
-          20.milli,
-          10.milli)),
-      term("select", "COUNT(string) AS TMP_2")
-    )
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'long,
-          5.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'rowtime,
-          5.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows,
-          1.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'long,
-          8.milli,
-          10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "rowtime")
-      ),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'long,
-          5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli,
-          50.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows,
-          1.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "rowtime")
-      ),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-//  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term(
-        "window",
-        SessionGroupWindow(
-          WindowReference("w"),
-          'long,
-          7.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start('w) AS TMP_1",
-        "end('w) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSlidingWindowWithUDAF(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String, Int, Int)](
-      'long,
-      'int,
-      'string,
-      'int2,
-      'int3,
-      'proctime.proctime)
-
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'int2, 'int3, 'string)
-      .select(weightAvgFun('long, 'int))
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          streamTableNode(0),
-          term("groupBy", "string, int2, int3"),
-          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  2.rows, 1.rows)),
-          term(
-            "select",
-            "string",
-            "int2",
-            "int3",
-            "WeightedAvg(long, int) AS TMP_0")
-        ),
-        term("select","TMP_0")
-      )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSlideWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start('w) AS TMP_1",
-        "end('w) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSessionWindowStartWithTwoEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 3.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        streamTableNode(0),
-        term("groupBy", "string"),
-        term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
-        term("select",
-          "string",
-          "COUNT(int) AS TMP_1",
-          "end('w) AS TMP_0",
-          "start('w) AS TMP_2")
-      ),
-      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.millis on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
-        'w.end as 'x3, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        streamTableNode(0),
-        term("groupBy", "string"),
-        term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
-        term("select",
-          "string",
-          "SUM(int) AS TMP_0",
-          "start('w) AS TMP_1",
-          "end('w) AS TMP_2")
-      ),
-      term("select",
-        "string",
-        "+(CAST(TMP_0), 1) AS s1",
-        "+(CAST(TMP_0), 3) AS s2",
-        "TMP_1 AS x",
-        "TMP_1 AS x2",
-        "TMP_2 AS x3",
-        "TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
deleted file mode 100644
index 6f9aad2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.JavaFunc0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.RowTimeSourceFunction
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
-  @Test
-  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
-
-    val data = List(
-      (1L, 1, "Hello"),
-      (2L, 2, "Hello"),
-      (3L, 3, "Hello"),
-      (4L, 4, "Hello"),
-      (5L, 5, "Hello"),
-      (6L, 6, "Hello"),
-      (7L, 7, "Hello World"),
-      (8L, 8, "Hello World"),
-      (20L, 20, "Hello World"))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    StreamITCase.clear
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(
-        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
-      .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg)
-      .select('c, 'mycount, 'wAvg)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hello World,1,7", "Hello World,2,7", "Hello World,3,14",
-      "Hello,1,1", "Hello,2,1", "Hello,3,2", "Hello,4,3", "Hello,5,3", "Hello,6,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-    val table = env
-      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-    val plusOne = new JavaFunc0
-
-    val windowedTable = table
-      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
-         CURRENT_RANGE as 'w)
-      .select(
-        'a, 'b, 'c,
-        'b.sum over 'w,
-        "SUM:".toExpr + ('b.sum over 'w),
-        countFun('b) over 'w,
-        (countFun('b) over 'w) + 1,
-        plusOne(countFun('b) over 'w),
-        array('b.avg over 'w, 'b.max over 'w),
-        'b.avg over 'w,
-        'b.max over 'w,
-        'b.min over 'w,
-        ('b.min over 'w).abs(),
-        weightAvgFun('b, 'a) over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,2,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,1,Hi,7,SUM:7,4,5,5,[1, 3],1,3,1,1,1",
-      "2,1,Hello,1,SUM:1,1,2,2,[1, 1],1,1,1,1,1",
-      "2,2,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "2,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,4,Hello world,11,SUM:11,5,6,6,[2, 4],2,4,1,1,2",
-      "1,5,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "1,6,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "1,7,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "2,4,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3",
-      "2,5,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3"
-    )
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
-    val data = List(
-      (1, 1L, 0, "Hallo", 1L),
-      (2, 2L, 1, "Hallo Welt", 2L),
-      (2, 3L, 2, "Hallo Welt wie", 1L),
-      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
-      (3, 5L, 4, "ABC", 2L),
-      (3, 6L, 5, "BCD", 3L),
-      (4, 7L, 6, "CDE", 2L),
-      (4, 8L, 7, "DEF", 1L),
-      (4, 9L, 8, "EFG", 1L),
-      (4, 10L, 9, "FGH", 2L),
-      (5, 11L, 10, "GHI", 1L),
-      (5, 12L, 11, "HIJ", 3L),
-      (5, 13L, 12, "IJK", 3L),
-      (5, 14L, 13, "JKL", 2L),
-      (5, 15L, 14, "KLM", 2L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
-      .select('a, 'c.sum over 'w, 'c.min over 'w)
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowOver(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
-      .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(
-        Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
-      .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object OverWindowITCase {
-
-  class RowTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index e571bae..e11b804 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -19,9 +19,9 @@ package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
-import org.apache.flink.table.api.{Table, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.api.Table
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
@@ -31,92 +31,6 @@ class OverWindowTest extends TableTestBase {
   val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
     'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowAlias(): Unit = {
-    val result = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
-      .select('c, 'b.count over 'x)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testOrderBy(): Unit = {
-    val result = table
-      .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPrecedingAndFollowingUsingIsLiteral(): Unit = {
-    val result = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPrecedingAndFollowingUsingSameType(): Unit = {
-    val result = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPartitionByWithUnresolved(): Unit = {
-    val result = table
-      .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPartitionByWithNotKeyType(): Unit = {
-    val table2 = streamUtil.addTable[(Int, String, Either[Long, String])]("MyTable2", 'a, 'b, 'c)
-
-    val result = table2
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPrecedingValue(): Unit = {
-    val result = table
-      .window(Over orderBy 'rowtime preceding -1.rows as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFollowingValue(): Unit = {
-    val result = table
-      .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
-      .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUdAggWithInvalidArgs(): Unit = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
-      .select('c, weightedAvg('b, 'a) over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
-  }
-
-  @Test
-  def testAccessesWindowProperties(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Window start and end properties are not available for Over windows.")
-
-    table
-    .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
-    .select('c, 'a.count over 'w, 'w.start, 'w.end)
-  }
-
   @Test
   def testScalarFunctionsOnOverWindow() = {
     val weightedAvg = new WeightedAvgWithRetract
@@ -676,9 +590,6 @@ class OverWindowTest extends TableTestBase {
 
     streamUtil.verifyTable(result, expected)
   }
-
 }
 
-object OverWindowTest{
-  case class Pojo(id: Long, name: String)
-}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/TableSourceTest.scala
new file mode 100644
index 0000000..a446c84
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/TableSourceTest.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+  @Test
+  def testRowTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+
+    val t = util.tableEnv.scan("rowTimeT").select("addTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+        term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testRowTimeTableSourceGroupWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+
+    val t = util.tableEnv.scan("rowTimeT")
+      .filter("val > 100")
+      .window(Tumble over 10.minutes on 'addTime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.end, 'val.avg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+            term("select", "name", "val", "addTime"),
+            term("where", ">(val, 100)")
+          ),
+          term("groupBy", "name"),
+          term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"),
+          term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0")
+        ),
+        term("select", "name", "TMP_0", "TMP_1")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+
+    val t = util.tableEnv.scan("procTimeT").select("pTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+        term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceOverWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+
+    val t = util.tableEnv.scan("procTimeT")
+      .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+      .select('id, 'name, 'val.sum over 'w as 'valSum)
+      .filter('valSum > 100)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+          term("partitionBy", "id"),
+          term("orderBy", "pTime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+        ),
+        term("select", "id", "name", "w0$o0 AS valSum"),
+        term("where", ">(w0$o0, 100)")
+      )
+    util.verifyTable(t, expected)
+  }
+
+
+}
+
+class TestRowtimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getRowtimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+class TestProctimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedProctimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getProctimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+


Mime
View raw message