flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [36/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:45 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
deleted file mode 100644
index cc9d786..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class WindowAggregateTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)](
-    "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
-
-  @Test
-  def testGroupbyWithoutWindow() = {
-    val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "b", "a")
-          ),
-          term("groupBy", "b"),
-          term("select", "b", "COUNT(a) AS EXPR$0")
-        ),
-        term("select", "EXPR$0")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testTumbleFunction() = {
-    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
-    val sql =
-      "SELECT " +
-        "  COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
-        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-    val expected =
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "rowtime", "c", "a")
-        ),
-        term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testHoppingFunction() = {
-    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
-    val sql =
-      "SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  HOP_START(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
-        "  HOP_END(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
-        "FROM MyTable " +
-        "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
-    val expected =
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "proctime", "c", "a")
-        ),
-        term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testSessionFunction() = {
-    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
-    val sql =
-      "SELECT " +
-        "  COUNT(*), weightedAvg(c, a) AS wAvg, " +
-        "  SESSION_START(proctime, INTERVAL '15' MINUTE), " +
-        "  SESSION_END(proctime, INTERVAL '15' MINUTE) " +
-        "FROM MyTable " +
-        "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
-    val expected =
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "proctime", "c", "a")
-        ),
-        term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
-        term("select",
-          "COUNT(*) AS EXPR$0, " +
-            "weightedAvg(c, a) AS wAvg, " +
-            "start('w$) AS w$start, " +
-            "end('w$) AS w$end")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testExpressionOnWindowAuxFunction() = {
-    val sql =
-      "SELECT " +
-        "  COUNT(*), " +
-        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE) + INTERVAL '1' MINUTE " +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "rowtime")
-          ),
-          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
-          term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", "end('w$) AS w$end")
-        ),
-        term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1")
-      )
-
-    streamUtil.verifySql(sql, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
deleted file mode 100644
index b1efc09..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql.validation
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
-import org.apache.flink.table.expressions.AggFunctionCall
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Ignore, Test}
-
-class AggregationsValidationTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    streamUtil.addFunction("overAgg", new OverAgg0)
-
-    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
-
-    streamUtil.tableEnv.sql(sqlQuery)
-  }
-
-  @Test
-  def testDistinct(): Unit = {
-    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        streamTableNode(0),
-        term("groupBy", "a, b, c"),
-        term("select", "a, b, c")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
-  // TODO: reopen this until FLINK-7144 fixed
-  @Ignore
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "a")
-        ),
-        term("groupBy", "a"),
-        term("select", "a")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-
-  @Test
-  def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
-    streamUtil.addFunction("udag", new MyAgg)
-    val call = streamUtil
-      .tEnv
-      .functionCatalog
-      .lookupFunction("udag", Seq())
-      .asInstanceOf[AggFunctionCall]
-
-    val typeInfo = call.accTypeInfo
-    assertTrue(typeInfo.isInstanceOf[CaseClassTypeInfo[_]])
-    assertEquals(2, typeInfo.getTotalFields)
-    val caseTypeInfo = typeInfo.asInstanceOf[CaseClassTypeInfo[_]]
-    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(0))
-    assertEquals(Types.LONG, caseTypeInfo.getTypeAt(1))
-
-    streamUtil.addFunction("udag2", new MyAgg2)
-    val call2 = streamUtil
-      .tEnv
-      .functionCatalog
-      .lookupFunction("udag2", Seq())
-      .asInstanceOf[AggFunctionCall]
-
-    val typeInfo2 = call2.accTypeInfo
-    assertTrue(s"actual type: $typeInfo2", typeInfo2.isInstanceOf[RowTypeInfo])
-    assertEquals(2, typeInfo2.getTotalFields)
-    val rowTypeInfo = typeInfo2.asInstanceOf[RowTypeInfo]
-    assertEquals(Types.LONG, rowTypeInfo.getTypeAt(0))
-    assertEquals(Types.INT, rowTypeInfo.getTypeAt(1))
-  }
-}
-
-case class MyAccumulator(var sum: Long, var count: Long)
-
-class MyAgg extends AggregateFunction[Long, MyAccumulator] {
-
-  //Overloaded accumulate method
-  def accumulate(acc: MyAccumulator, value: Long): Unit = {
-  }
-
-  override def createAccumulator(): MyAccumulator = MyAccumulator(0, 0)
-
-  override def getValue(accumulator: MyAccumulator): Long = 1L
-}
-
-class MyAgg2 extends AggregateFunction[Long, Row] {
-
-  def accumulate(acc: Row, value: Long): Unit = {}
-
-  override def createAccumulator(): Row = new Row(2)
-
-  override def getValue(accumulator: Row): Long = 1L
-
-  def getAccumulatorType: TypeInformation[_] = new RowTypeInfo(Types.LONG, Types.INT)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
deleted file mode 100644
index 3cc13f4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class OverWindowValidationTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime.proctime)
-
-  /**
-    * All aggregates must be computed on the same window.
-    */
-  @Test(expected = classOf[TableException])
-  def testMultiWindow(): Unit = {
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
deleted file mode 100644
index 0fd8740..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class WindowAggregateValidationTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)](
-    "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    streamUtil.addFunction("overAgg", new OverAgg0)
-
-    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
-    streamUtil.tableEnv.sql(sqlQuery)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testTumbleWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testHopWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testSessionWindowNoOffset(): Unit = {
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM MyTable " +
-        "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testVariableWindowSize() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
-    streamUtil.verifySql(sql, "n/a")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testWindowUdAggInvalidArgs(): Unit = {
-    streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
-        "FROM MyTable " +
-        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
deleted file mode 100644
index 20a88fe..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainTest
-  extends StreamingMultipleProgramsTestBase {
-
-  val testFilePath = this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = replaceString(tEnv.explain(table))
-
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilterStream0.out").mkString
-    val expect = replaceString(source)
-    assertEquals(result, expect)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = replaceString(tEnv.explain(table))
-
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnionStream0.out").mkString
-    val expect = replaceString(source)
-    assertEquals(result, expect)
-  }
-
-  def replaceString(s: String): String = {
-    /* Stage {id} is ignored, because id keeps incrementing in test class
-     * while StreamExecutionEnvironment is up
-     */
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
deleted file mode 100644
index fec25eb..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result =  table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
deleted file mode 100644
index 3bf9c33..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.{Upper, WindowReference}
-import org.apache.flink.table.plan.logical.TumblingGroupWindow
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{TableTestBase, _}
-import org.junit.Test
-
-/**
-  * Tests for all the situations when we can do fields projection. Like selecting few fields
-  * from a large field count source.
-  */
-class FieldProjectionTest extends TableTestBase {
-
-  val streamUtil: StreamTableTestUtil = streamTestUtil()
-
-  @Test
-  def testSelectFromWindow(): Unit = {
-    val sourceTable = streamUtil
-      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
-    val resultTable = sourceTable
-        .window(Tumble over 5.millis on 'rowtime as 'w)
-        .groupBy('w)
-        .select(Upper('c).count, 'a.sum)
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
-        ),
-        term("window",
-          TumblingGroupWindow(
-            WindowReference("w"),
-            'rowtime,
-            5.millis)),
-        term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
-      )
-
-    streamUtil.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromGroupedWindow(): Unit = {
-    val sourceTable = streamUtil
-      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
-    val resultTable = sourceTable
-        .window(Tumble over 5.millis on 'rowtime as 'w)
-        .groupBy('w, 'b)
-        .select(Upper('c).count, 'a.sum, 'b)
-
-    val expected = unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
-          ),
-          term("groupBy", "b"),
-          term("window",
-            TumblingGroupWindow(
-              WindowReference("w"),
-              'rowtime,
-              5.millis)),
-          term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
-        ),
-        term("select", "TMP_0", "TMP_1", "b")
-    )
-
-    streamUtil.verifyTable(resultTable, expected)
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
deleted file mode 100644
index 791f778..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class GroupAggregationsTest extends TableTestBase {
-
-  @Test
-  def testGroupAggregate() = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .groupBy('b)
-      .select('a.count)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "b")
-          ),
-          term("groupBy", "b"),
-          term("select", "b", "COUNT(a) AS TMP_0")
-        ),
-        term("select", "TMP_0")
-      )
-    util.verifyTable(resultTable, expected)
-  }
-
-
-  @Test
-  def testGroupAggregateWithConstant1(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "4 AS four", "b", "a")
-          ),
-          term("groupBy", "four", "a"),
-          term("select", "four", "a", "SUM(b) AS TMP_0")
-        ),
-        term("select", "4 AS four", "TMP_0")
-      )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testGroupAggregateWithConstant2(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .select('b, 4 as 'four, 'a)
-      .groupBy('b, 'four)
-      .select('four, 'a.sum)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "4 AS four", "a", "b")
-          ),
-          term("groupBy", "four", "b"),
-          term("select", "four", "b", "SUM(a) AS TMP_0")
-        ),
-        term("select", "4 AS four", "TMP_0")
-      )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testGroupAggregateWithExpressionInSelect(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
-      .groupBy('d)
-      .select('c.min, 'a.avg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "MOD(b, 3) AS d", "c")
-          ),
-          term("groupBy", "d"),
-          term("select", "d", "MIN(c) AS TMP_0", "AVG(a) AS TMP_1")
-        ),
-        term("select", "TMP_0", "TMP_1")
-      )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testGroupAggregateWithFilter(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "b", "a"),
-          term("where", "=(b, 2)")
-        ),
-        term("groupBy", "b"),
-        term("select", "b", "SUM(a) AS TMP_0")
-      )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testGroupAggregateWithAverage(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
-
-    val resultTable = table
-      .groupBy('b)
-      .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "b", "a", "CAST(a) AS a0")
-        ),
-        term("groupBy", "b"),
-        term("select", "b", "AVG(a0) AS TMP_0")
-      )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
deleted file mode 100644
index 92c8522..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * We only test some aggregations until better testing of constructed DataStream
-  * programs is possible.
-  */
-class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
-  private val queryConfig = new StreamQueryConfig()
-  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-  val data = List(
-    (1L, 1, "Hi"),
-    (2L, 2, "Hello"),
-    (4L, 2, "Hello"),
-    (8L, 3, "Hello world"),
-    (16L, 3, "Hello world"))
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
-
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('int), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
-                       "Hello,2,2,3,2", "Hi,1,1,1,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    //To verify the "merge" functionality, we create this test with the following characteristics:
-    // 1. set the Parallelism to 1, and have the test data out of order
-    // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
-    val sessionWindowTestdata = List(
-      (1L, 1, "Hello"),
-      (2L, 2, "Hello"),
-      (8L, 8, "Hello"),
-      (9L, 9, "Hello World"),
-      (4L, 4, "Hello"),
-      (16L, 16, "Hello"))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvgWithMerge
-
-    val stream = env
-      .fromCollection(sessionWindowTestdata)
-      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Session withGap 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('int), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select(countFun('string), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("2,1,1,1", "2,2,6,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeTumblingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
-              weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
-      "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
-      "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testGroupWindowWithoutKeyInProjection(): Unit = {
-    val data = List(
-      (1L, 1, "Hi", 1, 1),
-      (2L, 2, "Hello", 2, 2),
-      (4L, 2, "Hello", 2, 2),
-      (8L, 3, "Hello world", 3, 3),
-      (16L, 3, "Hello world", 3, 3))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
-
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'int2, 'int3, 'string)
-      .select(weightAvgFun('long, 'int))
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("12", "8", "2", "3", "1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object GroupWindowAggregationsITCase {
-  class TimestampAndWatermarkWithOffset(
-    offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
-    override def checkAndGetNextWatermark(
-        lastElement: (Long, Int, String),
-        extractedTimestamp: Long)
-      : Watermark = {
-      new Watermark(extractedTimestamp - offset)
-    }
-
-    override def extractTimestamp(
-        element: (Long, Int, String),
-        previousElementTimestamp: Long): Long = {
-      element._1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
deleted file mode 100644
index 6be0e13..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsTest.scala
+++ /dev/null
@@ -1,785 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.WindowReference
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
-import org.junit.{Ignore, Test}
-
-class GroupWindowAggregationsTest extends TableTestBase {
-
-  @Test
-  def testMultiWindow(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w1)
-      .groupBy('w1, 'string)
-      .select('w1.proctime as 'proctime, 'string, 'int.count)
-      .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
-      .groupBy('w2)
-      .select('string.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "string", "int", "proctime")
-          ),
-          term("groupBy", "string"),
-          term(
-            "window",
-            TumblingGroupWindow(
-              WindowReference("w1"),
-              'proctime,
-              50.milli)),
-          term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
-        ),
-        term("select", "string", "TMP_0 AS proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w2"),
-          'proctime,
-          20.milli,
-          10.milli)),
-      term("select", "COUNT(string) AS TMP_2")
-    )
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'long,
-          5.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'rowtime,
-          5.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows,
-          1.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'long,
-          8.milli,
-          10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowWithUdAgg(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val weightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, weightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 50.milli on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "proctime")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "rowtime")
-      ),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term(
-        "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'long,
-          5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          50.milli,
-          50.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "proctime")
-      ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows,
-          1.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "rowtime")
-      ),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-//  @Ignore // see comments in DataStreamGroupWindowAggregate
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int", "long")
-      ),
-      term(
-        "window",
-        SessionGroupWindow(
-          WindowReference("w"),
-          'long,
-          7.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start('w) AS TMP_1",
-        "end('w) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSlidingWindowWithUDAF(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String, Int, Int)](
-      'long,
-      'int,
-      'string,
-      'int2,
-      'int3,
-      'proctime.proctime)
-
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'int2, 'int3, 'string)
-      .select(weightAvgFun('long, 'int))
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          streamTableNode(0),
-          term("groupBy", "string, int2, int3"),
-          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  2.rows, 1.rows)),
-          term(
-            "select",
-            "string",
-            "int2",
-            "int3",
-            "WeightedAvg(long, int) AS TMP_0")
-        ),
-        term("select","TMP_0")
-      )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSlideWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamGroupWindowAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int", "rowtime")
-      ),
-      term("groupBy", "string"),
-      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start('w) AS TMP_1",
-        "end('w) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSessionWindowStartWithTwoEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 3.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        streamTableNode(0),
-        term("groupBy", "string"),
-        term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
-        term("select",
-          "string",
-          "COUNT(int) AS TMP_1",
-          "end('w) AS TMP_0",
-          "start('w) AS TMP_2")
-      ),
-      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.millis on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
-        'w.end as 'x3, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        streamTableNode(0),
-        term("groupBy", "string"),
-        term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
-        term("select",
-          "string",
-          "SUM(int) AS TMP_0",
-          "start('w) AS TMP_1",
-          "end('w) AS TMP_2")
-      ),
-      term("select",
-        "string",
-        "+(CAST(TMP_0), 1) AS s1",
-        "+(CAST(TMP_0), 3) AS s2",
-        "TMP_1 AS x",
-        "TMP_1 AS x2",
-        "TMP_2 AS x3",
-        "TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
deleted file mode 100644
index e11b804..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.Func1
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class OverWindowTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
-    'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
-
-  @Test
-  def testScalarFunctionsOnOverWindow() = {
-    val weightedAvg = new WeightedAvgWithRetract
-    val plusOne = Func1
-
-    val result = table
-      .window(Over partitionBy 'b orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
-      .select(
-        plusOne('a.sum over 'w as 'wsum) as 'd,
-        ('a.count over 'w).exp(),
-        (weightedAvg('c, 'a) over 'w) + 1,
-        "AVG:".toExpr + (weightedAvg('c, 'a) over 'w),
-        array(weightedAvg('c, 'a) over 'w, 'a.count over 'w))
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "b", "c", "proctime")
-          ),
-          term("partitionBy", "b"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime",
-               "SUM(a) AS w0$o0",
-               "COUNT(a) AS w0$o1",
-               "WeightedAvgWithRetract(c, a) AS w0$o2")
-        ),
-        term("select",
-             s"${plusOne.functionIdentifier}(w0$$o0) AS d",
-             "EXP(CAST(w0$o1)) AS _c1",
-             "+(w0$o2, 1) AS _c2",
-             "||('AVG:', CAST(w0$o2)) AS _c3",
-             "ARRAY(w0$o2, w0$o1) AS _c4")
-      )
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRowsOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(Over partitionBy 'b orderBy 'proctime preceding 2.rows following CURRENT_ROW as 'w)
-      .select('c, weightedAvg('c, 'a) over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "b", "c", "proctime")
-          ),
-          term("partitionBy", "b"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "WeightedAvgWithRetract(c, a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS _c1")
-      )
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRangeOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(
-        Over partitionBy 'a orderBy 'proctime preceding 2.hours following CURRENT_RANGE as 'w)
-      .select('a, weightedAvg('c, 'a) over 'w as 'myAvg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "a"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "proctime",
-            "WeightedAvgWithRetract(c, a) AS w0$o0"
-          )
-        ),
-        term("select", "a", "w0$o0 AS myAvg")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRangeOver() = {
-    val result = table
-      .window(Over orderBy 'proctime preceding 10.second as 'w)
-      .select('a, 'c.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
-        ),
-        term("select", "a", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRowsOver() = {
-    val result = table
-      .window(Over orderBy 'proctime preceding 2.rows as 'w)
-      .select('c, 'a.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRangeOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE following
-         CURRENT_RANGE as 'w)
-      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "proctime",
-            "COUNT(a) AS w0$o0",
-            "WeightedAvgWithRetract(c, a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "a",
-          "c",
-          "w0$o0 AS _c2",
-          "w0$o1 AS _c3"
-        )
-      )
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRowsOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(
-        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
-      .select('c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime",
-               "COUNT(a) AS w0$o0",
-               "WeightedAvgWithRetract(c, a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS _c2")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRangeOver() = {
-    val result = table
-      .window(
-        Over orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
-      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "proctime",
-            "COUNT(a) AS w0$o0",
-            "SUM(a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "a",
-          "c",
-          "w0$o0 AS _c2",
-          "w0$o1 AS _c3"
-        )
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRowsOver() = {
-    val result = table
-      .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
-      .select('c, 'a.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowsOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(
-        Over partitionBy 'b orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
-      .select('c, 'b.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "b", "c", "rowtime")
-          ),
-          term("partitionBy", "b"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "rowtime",
-               "COUNT(b) AS w0$o0",
-               "WeightedAvgWithRetract(c, a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS wAvg")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(
-        Over partitionBy 'a orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w)
-      .select('a, 'c.avg over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "a"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "rowtime",
-            "AVG(c) AS w0$o0",
-            "WeightedAvgWithRetract(c, a) AS w0$o1"
-          )
-        ),
-        term("select", "a", "w0$o0 AS _c1", "w0$o1 AS wAvg")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRangeOver() = {
-    val result = table
-      .window(Over orderBy 'rowtime preceding 10.second as 'w)
-      .select('a, 'c.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(c) AS w0$o0")
-        ),
-        term("select", "a", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRowsOver() = {
-    val result = table
-      .window(Over orderBy 'rowtime preceding 2.rows as 'w)
-      .select('c, 'a.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedPartitionedRangeOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE following
-         CURRENT_RANGE as 'w)
-      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "rowtime",
-            "COUNT(a) AS w0$o0",
-            "WeightedAvgWithRetract(c, a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "a",
-          "c",
-          "w0$o0 AS _c2",
-          "w0$o1 AS wAvg"
-        )
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedPartitionedRowsOver() = {
-    val weightedAvg = new WeightedAvgWithRetract
-
-    val result = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
-         CURRENT_ROW as 'w)
-      .select('c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime",
-               "COUNT(a) AS w0$o0",
-               "WeightedAvgWithRetract(c, a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS wAvg")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedNonPartitionedRangeOver() = {
-    val result = table
-      .window(
-        Over orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
-      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "rowtime",
-            "COUNT(a) AS w0$o0",
-            "SUM(a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "a",
-          "c",
-          "w0$o0 AS _c2",
-          "w0$o1 AS _c3"
-        )
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedNonPartitionedRowsOver() = {
-    val result = table
-      .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
-      .select('c, 'a.count over 'w)
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS _c1")
-      )
-
-    streamUtil.verifyTable(result, expected)
-  }
-}
-
-


Mime
View raw message