flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [33/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
new file mode 100644
index 0000000..02f84c0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcTest extends TableTestBase {
+
+  // ----------------------------------------------------------------------------------------------
+  // Tests for all the situations when we can do fields projection. Like selecting few fields
+  // from a large field count source.
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testSelectFromWindow(): Unit = {
+    val util = streamTestUtil()
+    val sourceTable =
+      util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+    val resultTable = sourceTable
+        .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w)
+        .select(Upper('c).count, 'a.sum)
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
+        ),
+        term("window",
+          TumblingGroupWindow(
+            WindowReference("w"),
+            'rowtime,
+            5.millis)),
+        term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testSelectFromGroupedWindow(): Unit = {
+    val util = streamTestUtil()
+    val sourceTable =
+      util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+    val resultTable = sourceTable
+        .window(Tumble over 5.millis on 'rowtime as 'w)
+        .groupBy('w, 'b)
+        .select(Upper('c).count, 'a.sum, 'b)
+
+    val expected = unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
+          ),
+          term("groupBy", "b"),
+          term("window",
+            TumblingGroupWindow(
+              WindowReference("w"),
+              'rowtime,
+              5.millis)),
+          term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
+        ),
+        term("select", "TMP_0", "TMP_1", "b")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
new file mode 100644
index 0000000..8f53f4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.utils._
+import org.junit.Test
+
+class CorrelateTest extends TableTestBase {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.join(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+           "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("hierarchy", new HierarchyTableFunction)
+
+    val result = table.join(function('c) as ('name, 'adult, 'len))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"${function.functionIdentifier}($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+        " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("pojo", new PojoTableFunc)
+
+    val result = table.join(function('c))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"${function.functionIdentifier}($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+         "INTEGER age, VARCHAR(2147483647) name)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table
+      .join(function('c) as ('name, 'len))
+      .select('c, 'name, 'len)
+      .filter('len > 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"${function.functionIdentifier}($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+          "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.join(function('c.substring(2)) as 's)
+
+    val expected = unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation",  s"${function.functionIdentifier}(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
new file mode 100644
index 0000000..a024460
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.WindowReference
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.{Ignore, Test}
+
+class GroupWindowTest extends TableTestBase {
+
+  @Test
+  def testMultiWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.proctime as 'proctime, 'string, 'int.count)
+      .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
+      .groupBy('w2)
+      .select('string.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "string", "int", "proctime")
+          ),
+          term("groupBy", "string"),
+          term(
+            "window",
+            TumblingGroupWindow(
+              WindowReference("w1"),
+              'proctime,
+              50.milli)),
+          term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
+        ),
+        term("select", "string", "TMP_0 AS proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w2"),
+          'proctime,
+          20.milli,
+          10.milli)),
+      term("select", "COUNT(string) AS TMP_2")
+    )
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'long,
+          5.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'rowtime,
+          5.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows,
+          1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'long,
+          8.milli,
+          10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowWithUdAgg(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val weightedAvg = new WeightedAvgWithMerge
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, weightedAvg('long, 'int))
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
+      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 50.milli on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "proctime")
+      ),
+      term("groupBy", "string"),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term(
+        "window",
+        TumblingGroupWindow(
+          WindowReference("w"),
+          'long,
+          5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          50.milli,
+          50.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "proctime")
+      ),
+      term(
+        "window",
+        SlidingGroupWindow(
+          WindowReference("w"),
+          'proctime,
+          2.rows,
+          1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+//  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "long")
+      ),
+      term(
+        "window",
+        SessionGroupWindow(
+          WindowReference("w"),
+          'long,
+          7.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlidingWindowWithUDAF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String, Int, Int)](
+      'long,
+      'int,
+      'string,
+      'int2,
+      'int3,
+      'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          streamTableNode(0),
+          term("groupBy", "string, int2, int3"),
+          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  2.rows, 1.rows)),
+          term(
+            "select",
+            "string",
+            "int2",
+            "int3",
+            "WeightedAvg(long, int) AS TMP_0")
+        ),
+        term("select","TMP_0")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlideWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSessionWindowStartWithTwoEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 3.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
+        term("select",
+          "string",
+          "COUNT(int) AS TMP_1",
+          "end('w) AS TMP_0",
+          "start('w) AS TMP_2")
+      ),
+      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.millis on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
+        'w.end as 'x3, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
+        term("select",
+          "string",
+          "SUM(int) AS TMP_0",
+          "start('w) AS TMP_1",
+          "end('w) AS TMP_2")
+      ),
+      term("select",
+        "string",
+        "+(CAST(TMP_0), 1) AS s1",
+        "+(CAST(TMP_0), 3) AS s2",
+        "TMP_1 AS x",
+        "TMP_1 AS x2",
+        "TMP_2 AS x3",
+        "TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
new file mode 100644
index 0000000..8b563a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.StreamTableTestUtil
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
+    'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testScalarFunctionsOnOverWindow() = {
+    val weightedAvg = new WeightedAvgWithRetract
+    val plusOne = Func1
+
+    val result = table
+      .window(Over partitionBy 'b orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select(
+        plusOne('a.sum over 'w as 'wsum) as 'd,
+        ('a.count over 'w).exp(),
+        (weightedAvg('c, 'a) over 'w) + 1,
+        "AVG:".toExpr + (weightedAvg('c, 'a) over 'w),
+        array(weightedAvg('c, 'a) over 'w, 'a.count over 'w))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "b", "c", "proctime")
+          ),
+          term("partitionBy", "b"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime",
+               "SUM(a) AS w0$o0",
+               "COUNT(a) AS w0$o1",
+               "WeightedAvgWithRetract(c, a) AS w0$o2")
+        ),
+        term("select",
+             s"${plusOne.functionIdentifier}(w0$$o0) AS d",
+             "EXP(CAST(w0$o1)) AS _c1",
+             "+(w0$o2, 1) AS _c2",
+             "||('AVG:', CAST(w0$o2)) AS _c3",
+             "ARRAY(w0$o2, w0$o1) AS _c4")
+      )
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(Over partitionBy 'b orderBy 'proctime preceding 2.rows following CURRENT_ROW as 'w)
+      .select('c, weightedAvg('c, 'a) over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "b", "c", "proctime")
+          ),
+          term("partitionBy", "b"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "proctime", "WeightedAvgWithRetract(c, a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRangeOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(
+        Over partitionBy 'a orderBy 'proctime preceding 2.hours following CURRENT_RANGE as 'w)
+      .select('a, weightedAvg('c, 'a) over 'w as 'myAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "proctime",
+            "WeightedAvgWithRetract(c, a) AS w0$o0"
+          )
+        ),
+        term("select", "a", "w0$o0 AS myAvg")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding 10.second as 'w)
+      .select('a, 'c.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding 2.rows as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE following
+         CURRENT_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "proctime",
+            "COUNT(a) AS w0$o0",
+            "WeightedAvgWithRetract(c, a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(
+        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
+      .select('c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime",
+               "COUNT(a) AS w0$o0",
+               "WeightedAvgWithRetract(c, a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS _c2")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "proctime",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "proctime")
+          ),
+          term("orderBy", "proctime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(
+        Over partitionBy 'b orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+      .select('c, 'b.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "b", "c", "rowtime")
+          ),
+          term("partitionBy", "b"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "b", "c", "rowtime",
+               "COUNT(b) AS w0$o0",
+               "WeightedAvgWithRetract(c, a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS wAvg")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(
+        Over partitionBy 'a orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w)
+      .select('a, 'c.avg over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "AVG(c) AS w0$o0",
+            "WeightedAvgWithRetract(c, a) AS w0$o1"
+          )
+        ),
+        term("select", "a", "w0$o0 AS _c1", "w0$o1 AS wAvg")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 10.second as 'w)
+      .select('a, 'c.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRangeOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE following
+         CURRENT_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "WeightedAvgWithRetract(c, a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS wAvg"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRowsOver() = {
+    val weightedAvg = new WeightedAvgWithRetract
+
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
+         CURRENT_ROW as 'w)
+      .select('c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime",
+               "COUNT(a) AS w0$o0",
+               "WeightedAvgWithRetract(c, a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS _c1", "w0$o1 AS wAvg")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "rowtime",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "rowtime")
+          ),
+          term("orderBy", "rowtime"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
new file mode 100644
index 0000000..5d4386c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+  @Test
+  def testRowTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+
+    val t = util.tableEnv.scan("rowTimeT").select("addTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+        term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testRowTimeTableSourceGroupWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime"))
+
+    val t = util.tableEnv.scan("rowTimeT")
+      .filter("val > 100")
+      .window(Tumble over 10.minutes on 'addTime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.end, 'val.avg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+            term("select", "name", "val", "addTime"),
+            term("where", ">(val, 100)")
+          ),
+          term("groupBy", "name"),
+          term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"),
+          term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0")
+        ),
+        term("select", "name", "TMP_0", "TMP_1")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceSimple(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+
+    val t = util.tableEnv.scan("procTimeT").select("pTime, id, name, val")
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+        term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val")
+      )
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProcTimeTableSourceOverWindow(): Unit = {
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+
+    val t = util.tableEnv.scan("procTimeT")
+      .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+      .select('id, 'name, 'val.sum over 'w as 'valSum)
+      .filter('valSum > 100)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+          term("partitionBy", "id"),
+          term("orderBy", "pTime"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+        ),
+        term("select", "id", "name", "w0$o0 AS valSum"),
+        term("where", ">(w0$o0, 100)")
+      )
+    util.verifyTable(t, expected)
+  }
+}
+
+class TestRowtimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getRowtimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+class TestProctimeSource(timeField: String)
+    extends StreamTableSource[Row] with DefinedProctimeAttribute {
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+  override def getProctimeAttribute: String = timeField
+
+  override def getReturnType: TypeInformation[Row] = {
+    new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
new file mode 100644
index 0000000..2bef95e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class AggregateStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .groupBy('string)
+      .select('int.count as 'cnt, weightAvgFun('long, 'int))
+
+    // String / Java API
+    val resJava = t
+      .groupBy("string")
+      .select("int.count as cnt, weightAvgFun(long, int)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testNonGroupedAggregate(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    // Expression / Scala API
+    val resScala = t.select('int.count as 'cnt, 'long.sum)
+
+    // String / Java API
+    val resJava = t.select("int.count as cnt, long.sum")
+
+    verifyTableEquals(resJava, resScala)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..6fd0158
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testSimpleSelect(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]()
+
+    val resScala = t.select('_1, '_2)
+    val resJava = t.select("_1, _2")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.select('*)
+    val resJava = t.select("*")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSelectWithWhere(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val resScala = t.where('string === "true").select('int)
+    val resJava = t.where("string === 'true'").select("int")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.select('int, 'string)
+    val resJava = t.select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleSelectWithAlias(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val resScala = t.select('int as 'myInt, 'string as 'myString)
+    val resJava = t.select("int as myInt, string as myString")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int === 3).select('int as 'myInt, 'string)
+    val resJava = t.filter("int === 3").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter(Literal(false)).select('int as 'myInt, 'string)
+    val resJava = t.filter("false").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter(Literal(true)).select('int as 'myInt, 'string)
+    val resJava = t.filter("true").select("int as myInt, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testNotEqualsFilter(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int !== 2).filter('string.like("%world%")).select('int, 'string)
+    val resJava = t.filter("int !== 2").filter("string.like('%world%')").select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testFilterWithExpression(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+    val resScala = t.filter('int % 2 === 0).select('int, 'string)
+    val resJava = t.filter("int % 2 === 0").select("int, string")
+    verifyTableEquals(resJava, resScala)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
new file mode 100644
index 0000000..0d12400
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.stringexpr
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.utils._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class CorrelateStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testCorrelateJoins(): Unit = {
+
+    val util = streamTestUtil()
+    val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    val jTab = util.addJavaTable[Row](typeInfo,"MyTab","a, b, c")
+
+    // test cross join
+    val func1 = new TableFunc1
+    util.javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+    var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "func1(c)").as("s")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    util.javaTableEnv.registerFunction("func2", func2)
+    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = sTab.join(hierarchy('c) as ('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    util.javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
new file mode 100644
index 0000000..2cb5a8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.api.java.{Session => JSession, Slide => JSlide, Tumble => JTumble}
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testRowTimeSlide(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Slide over 4.hours every 2.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select(
+        'string,
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w"))
+      .groupBy("w, string")
+      .select(
+        "string, " +
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowTimeTumble(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, Long, String)]('int, 'long, 'rowtime.rowtime, 'string)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 4.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select(
+        'string,
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("4.hours").on("rowtime").as("w"))
+      .groupBy("w, string")
+      .select(
+        "string, " +
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowTimeSession(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Session withGap 4.hours on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select(
+        'string,
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start)
+
+    // String / Java API
+    val resJava = t
+      .window(JSession.withGap("4.hours").on("rowtime").as("w"))
+      .groupBy("w, string")
+      .select(
+        "string, " +
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)"
+      )
+
+    verifyTableEquals(resJava, resScala)
+  }
+  @Test
+  def testProcTimeSlide(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Slide over 4.hours every 2.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testProcTimeTumble(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long,'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 4.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("4.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testProcTimeSession(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+    val myCountFun = new CountAggFunction
+    util.tableEnv.registerFunction("myCountFun", myCountFun)
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Session withGap 4.hours on 'proctime as 'w)
+      .groupBy('w)
+      .select(
+        myCountFun('string),
+        'int.sum,
+        weightAvgFun('long, 'int),
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
+
+    // String / Java API
+    val resJava = t
+      .window(JSession.withGap("4.hours").on("proctime").as("w"))
+      .groupBy("w")
+      .select(
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w), " +
+        "end(w)"
+      )
+
+    verifyTableEquals(resJava, resScala)
+  }
+}


Mime
View raw message