flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [39/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
deleted file mode 100644
index 85cfb18..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class DistinctAggregateTest extends TableTestBase {
-
-  @Test
-  def testSingleDistinctAggregate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetValues",
-          unaryNode(
-            "DataSetDistinct",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a")
-            ),
-            term("distinct", "a")
-          ),
-          tuples(List(null)),
-          term("values", "a")
-        ),
-        term("union", "a")
-      ),
-      term("select", "COUNT(a) AS EXPR$0")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnSameColumn(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetValues",
-          unaryNode(
-            "DataSetDistinct",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a")
-            ),
-            term("distinct", "a")
-          ),
-          tuples(List(null)),
-          term("values", "a")
-        ),
-        term("union", "a")
-      ),
-      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
-
-    val expected0 = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetValues",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a", "b")
-            ),
-            term("groupBy", "a"),
-            term("select", "a", "SUM(b) AS EXPR$1")
-          ),
-          tuples(List(null, null)),
-          term("values", "a", "EXPR$1")
-        ),
-        term("union", "a", "EXPR$1")
-      ),
-      term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
-    )
-
-    util.verifySql(sqlQuery0, expected0)
-
-    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
-    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
-
-    val expected1 = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetValues",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a", "b")
-            ),
-            term("groupBy", "b"),
-            term("select", "b", "COUNT(a) AS EXPR$0")
-          ),
-          tuples(List(null, null)),
-          term("values", "b", "EXPR$0")
-        ),
-        term("union", "b", "EXPR$0")
-      ),
-      term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
-    )
-
-    util.verifySql(sqlQuery1, expected1)
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
-
-    val expected = binaryNode(
-      "DataSetSingleRowJoin",
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetUnion",
-          unaryNode(
-            "DataSetValues",
-            unaryNode(
-              "DataSetDistinct",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(0),
-                term("select", "a")
-              ),
-              term("distinct", "a")
-            ),
-            tuples(List(null)),
-            term("values", "a")
-          ),
-          term("union", "a")
-        ),
-        term("select", "COUNT(a) AS EXPR$0")
-      ),
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetUnion",
-          unaryNode(
-            "DataSetValues",
-            unaryNode(
-              "DataSetDistinct",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(0),
-                term("select", "b")
-              ),
-              term("distinct", "b")
-            ),
-            tuples(List(null)),
-            term("values", "b")
-          ),
-          term("union", "b")
-        ),
-        term("select", "SUM(b) AS EXPR$1")
-      ),
-      term("where", "true"),
-      term("join", "EXPR$0", "EXPR$1"),
-      term("joinType", "NestedLoopInnerJoin")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetSingleRowJoin",
-        binaryNode(
-          "DataSetSingleRowJoin",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null, null)),
-                term("values", "a, b, c")
-              ),
-              term("union", "a, b, c")
-            ),
-            term("select", "COUNT(c) AS EXPR$2")
-          ),
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetDistinct",
-                  unaryNode(
-                    "DataSetCalc",
-                    batchTableNode(0),
-                    term("select", "a")
-                  ),
-                  term("distinct", "a")
-                ),
-                tuples(List(null)),
-                term("values", "a")
-              ),
-              term("union", "a")
-            ),
-            term("select", "COUNT(a) AS EXPR$0")
-          ),
-          term("where", "true"),
-          term("join", "EXPR$2, EXPR$0"),
-          term("joinType", "NestedLoopInnerJoin")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetUnion",
-            unaryNode(
-              "DataSetValues",
-              unaryNode(
-                "DataSetDistinct",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "b")
-                ),
-                term("distinct", "b")
-              ),
-              tuples(List(null)),
-              term("values", "b")
-            ),
-            term("union", "b")
-          ),
-          term("select", "SUM(b) AS EXPR$1")
-        ),
-        term("where", "true"),
-        term("join", "EXPR$2", "EXPR$0, EXPR$1"),
-        term("joinType", "NestedLoopInnerJoin")
-      ),
-      term("select", "EXPR$0, EXPR$1, EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGrouping(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a"
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a", "b")
-        ),
-        term("groupBy", "a", "b"),
-        term("select", "a", "b", "COUNT(a) AS EXPR$1")
-      ),
-      term("groupBy", "a"),
-      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a"
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a", "b")
-        ),
-        term("groupBy", "a", "b"),
-        term("select", "a", "b", "COUNT(*) AS EXPR$1")
-      ),
-      term("groupBy", "a"),
-      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetJoin",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "a", "b")
-          ),
-          term("groupBy", "a"),
-          term("select", "a", "COUNT(*) AS EXPR$1")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetDistinct",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a", "b")
-            ),
-            term("distinct", "a, b")
-          ),
-          term("groupBy", "a"),
-          term("select", "a, SUM(b) AS EXPR$2, COUNT(b) AS EXPR$3")
-        ),
-        term("where", "IS NOT DISTINCT FROM(a, a0)"),
-        term("join", "a, EXPR$1, a0, EXPR$2, EXPR$3"),
-        term("joinType", "InnerJoin")
-      ),
-      term("select", "a, EXPR$1, EXPR$2, EXPR$3")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetJoin",
-        unaryNode(
-          "DataSetCalc",
-          binaryNode(
-            "DataSetJoin",
-            unaryNode(
-              "DataSetAggregate",
-              batchTableNode(0),
-              term("groupBy", "a"),
-              term("select", "a, COUNT(*) AS EXPR$1")
-            ),
-            unaryNode(
-              "DataSetAggregate",
-              unaryNode(
-                "DataSetDistinct",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a", "b")
-                ),
-                term("distinct", "a, b")
-              ),
-              term("groupBy", "a"),
-              term("select", "a, SUM(b) AS EXPR$2")
-            ),
-            term("where", "IS NOT DISTINCT FROM(a, a0)"),
-            term("join", "a, EXPR$1, a0, EXPR$2"),
-            term("joinType", "InnerJoin")
-          ),
-          term("select", "a, EXPR$1, EXPR$2")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetDistinct",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "a", "c")
-            ),
-            term("distinct", "a, c")
-          ),
-          term("groupBy", "a"),
-          term("select", "a, COUNT(c) AS EXPR$3")
-        ),
-        term("where", "IS NOT DISTINCT FROM(a, a0)"),
-        term("join", "a, EXPR$1, EXPR$2, a0, EXPR$3"),
-        term("joinType", "InnerJoin")
-      ),
-      term("select", "a, EXPR$1, EXPR$2, EXPR$3")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/ExpressionReductionTest.scala
deleted file mode 100644
index 5d5d7f0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/GroupingSetsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/GroupingSetsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/GroupingSetsTest.scala
deleted file mode 100644
index c12e5a6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/GroupingSetsTest.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class GroupingSetsTest extends TableTestBase {
-
-  @Test
-  def testGroupingSets(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " +
-      "GROUP BY GROUPING SETS (b, c)"
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetAggregate",
-          batchTableNode(0),
-          term("groupBy", "b"),
-          term("select", "b", "AVG(a) AS c")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          batchTableNode(0),
-          term("groupBy", "c"),
-          term("select", "c AS b", "AVG(a) AS c")
-        ),
-        term("union", "b", "c", "i$b", "i$c", "a")
-      ),
-      term("select",
-        "CASE(i$b, null, b) AS b",
-        "CASE(i$c, null, c) AS c",
-        "a",
-        "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g") // GROUP_ID()
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testCube(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " +
-      "GROUPING(b) as gb, GROUPING(c) as gc, " +
-      "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " +
-      "GROUPING_ID(b, c) as gid " +
-      "FROM MyTable " +
-      "GROUP BY CUBE (b, c)"
-
-    val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
-    )
-
-    val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
-    )
-
-    val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "c"),
-      term("select", "c AS b",
-           "AVG(a) AS c")
-    )
-
-    val group4 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union2 = binaryNode(
-      "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union3 = binaryNode(
-      "DataSetUnion",
-      union2, group4,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union3,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testRollup(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " +
-                   "GROUPING(b) as gb, GROUPING(c) as gc, " +
-                   "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " +
-                   "GROUPING_ID(b, c) as gid " + " FROM MyTable " +
-                   "GROUP BY ROLLUP (b, c)"
-
-    val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
-    )
-
-    val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
-    )
-
-    val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union2 = binaryNode(
-      "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union2,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
deleted file mode 100644
index a56bbed..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class QueryDecorrelationTest extends TableTestBase {
-
-  @Test
-  def testCorrelationScalarAggAndFilter(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String, String, Int, Int)]("emp", 'empno, 'ename, 'job, 'salary, 'deptno)
-    util.addTable[(Int, String)]("dept", 'deptno, 'name)
-
-    val sql = "SELECT e1.empno\n" +
-        "FROM emp e1, dept d1 where e1.deptno = d1.deptno\n" +
-        "and e1.deptno < 10 and d1.deptno < 15\n" +
-        "and e1.salary > (select avg(salary) from emp e2 where e1.empno = e2.empno)"
-
-    val expectedQuery = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetJoin",
-        unaryNode(
-          "DataSetCalc",
-          binaryNode(
-            "DataSetJoin",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(0),
-              term("select", "empno", "salary", "deptno"),
-              term("where", "<(deptno, 10)")
-            ),
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(1),
-              term("select", "deptno"),
-              term("where", "<(deptno, 15)")
-            ),
-            term("where", "=(deptno, deptno0)"),
-            term("join", "empno", "salary", "deptno", "deptno0"),
-            term("joinType", "InnerJoin")
-          ),
-          term("select", "empno", "salary")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "empno", "salary"),
-            term("where", "IS NOT NULL(empno)")
-          ),
-          term("groupBy", "empno"),
-          term("select", "empno", "AVG(salary) AS EXPR$0")
-        ),
-        term("where", "AND(=(empno, empno0), >(salary, EXPR$0))"),
-        term("join", "empno", "salary", "empno0", "EXPR$0"),
-        term("joinType", "InnerJoin")
-      ),
-      term("select", "empno")
-    )
-
-    util.verifySql(sql, expectedQuery)
-  }
-
-  @Test
-  def testDecorrelateWithMultiAggregate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String, String, Int, Int)]("emp", 'empno, 'ename, 'job, 'salary, 'deptno)
-    util.addTable[(Int, String)]("dept", 'deptno, 'name)
-
-    val sql = "select sum(e1.empno) from emp e1, dept d1 " +
-        "where e1.deptno = d1.deptno " +
-        "and e1.salary > (" +
-        "    select avg(e2.salary) from emp e2 where e2.deptno = d1.deptno" +
-        ")"
-
-    val expectedQuery = unaryNode(
-      "DataSetAggregate",
-      binaryNode(
-        "DataSetUnion",
-        values(
-          "DataSetValues",
-          tuples(List(null)),
-          term("values", "empno")
-        ),
-        unaryNode(
-          "DataSetCalc",
-          binaryNode(
-            "DataSetJoin",
-            unaryNode(
-              "DataSetCalc",
-              binaryNode(
-                "DataSetJoin",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "empno", "salary", "deptno")
-                ),
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(1),
-                  term("select", "deptno")
-                ),
-                term("where", "=(deptno, deptno0)"),
-                term("join", "empno", "salary", "deptno", "deptno0"),
-                term("joinType", "InnerJoin")
-              ),
-              term("select", "empno", "salary", "deptno0")
-            ),
-            unaryNode(
-              "DataSetAggregate",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(0),
-                term("select", "salary", "deptno"),
-                term("where", "IS NOT NULL(deptno)")
-              ),
-              term("groupBy", "deptno"),
-              term("select", "deptno", "AVG(salary) AS EXPR$0")
-            ),
-            term("where", "AND(=(deptno0, deptno), >(salary, EXPR$0))"),
-            term("join", "empno", "salary", "deptno0", "deptno", "EXPR$0"),
-            term("joinType", "InnerJoin")
-          ),
-          term("select", "empno")
-        ),
-        term("union", "empno")
-      ),
-      term("select", "SUM(empno) AS EXPR$0")
-    )
-
-    util.verifySql(sql, expectedQuery)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
deleted file mode 100644
index ba0326a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class SetOperatorsTest extends TableTestBase {
-
-  @Test
-  def testMinusWithNestedTypes(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
-
-    val expected = binaryNode(
-      "DataSetMinus",
-      batchTableNode(0),
-      batchTableNode(0),
-      term("minus", "a", "b", "c")
-    )
-
-    val result = t.minus(t)
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testExists(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
-    util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetJoin",
-        batchTableNode(0),
-        unaryNode(
-          "DataSetCalc",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetCalc",
-              batchTableNode(1),
-              term("select", "b_long AS b_long3", "true AS $f0"),
-              term("where", "IS NOT NULL(b_long)")
-            ),
-            term("groupBy", "b_long3"),
-            term("select", "b_long3", "MIN($f0) AS $f1")
-          ),
-          term("select", "b_long3")
-        ),
-        term("where", "=(a_long, b_long3)"),
-        term("join", "a_long", "a_int", "a_string", "b_long3"),
-        term("joinType", "InnerJoin")
-      ),
-      term("select", "a_int", "a_string")
-    )
-
-    util.verifySql(
-      "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
-      expected
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index e091da2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
-import org.apache.flink.table.utils._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-
-    // test overloading
-
-    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testCustomType(): Unit = {
-    val util = batchTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
-    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
-    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "age")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testFilter(): Unit = {
-    val util = batchTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
-      "WHERE len > 2"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-
-  @Test
-  def testScalarFunction(): Unit = {
-    val util = batchTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
deleted file mode 100644
index 64075ef..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class WindowAggregateTest extends TableTestBase {
-
-  @Test
-  def testNonPartitionedTumbleWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
-
-    val expected =
-      unaryNode(
-        "DataSetWindowAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "ts, a, b")
-        ),
-        term("window", TumblingGroupWindow('w$, 'ts, 7200000.millis)),
-        term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPartitionedTumbleWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT " +
-        "  TUMBLE_START(ts, INTERVAL '4' MINUTE), " +
-        "  TUMBLE_END(ts, INTERVAL '4' MINUTE), " +
-        "  c, " +
-        "  SUM(a) AS sumA, " +
-        "  MIN(b) AS minB " +
-        "FROM T " +
-        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetWindowAggregate",
-          batchTableNode(0),
-          term("groupBy", "c"),
-          term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
-          term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
-        ),
-        term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, sumA, minB")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testTumbleWindowWithUdAgg() = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val weightedAvg = new WeightedAvgWithMerge
-    util.tableEnv.registerFunction("weightedAvg", weightedAvg)
-
-    val sql = "SELECT weightedAvg(b, a) AS wAvg " +
-      "FROM T " +
-      "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
-
-    val expected =
-      unaryNode(
-        "DataSetWindowAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "ts, b, a")
-        ),
-        term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
-        term("select", "weightedAvg(b, a) AS wAvg")
-      )
-
-    util.verifySql(sql, expected)
-  }
-
-  @Test
-  def testNonPartitionedHopWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM T " +
-        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '90' MINUTE)"
-
-    val expected =
-      unaryNode(
-        "DataSetWindowAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "ts, a, b")
-        ),
-        term("window",
-          SlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)),
-        term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPartitionedHopWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
-
-    val sqlQuery =
-      "SELECT " +
-        "  c, " +
-        "  HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
-        "  HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
-        "  SUM(a) AS sumA, " +
-        "  AVG(b) AS avgB " +
-        "FROM T " +
-        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetWindowAggregate",
-          batchTableNode(0),
-          term("groupBy", "c, d"),
-          term("window",
-            SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
-          term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
-        ),
-        term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, sumA, avgB")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testNonPartitionedSessionWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT COUNT(*) AS cnt FROM T GROUP BY SESSION(ts, INTERVAL '30' MINUTE)"
-
-    val expected =
-      unaryNode(
-        "DataSetWindowAggregate",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "ts")
-        ),
-        term("window", SessionGroupWindow('w$, 'ts, 1800000.millis)),
-        term("select", "COUNT(*) AS cnt")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPartitionedSessionWindow(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
-
-    val sqlQuery =
-      "SELECT " +
-        "  c, d, " +
-        "  SESSION_START(ts, INTERVAL '12' HOUR), " +
-        "  SESSION_END(ts, INTERVAL '12' HOUR), " +
-        "  SUM(a) AS sumA, " +
-        "  MIN(b) AS minB " +
-        "FROM T " +
-        "GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetWindowAggregate",
-          batchTableNode(0),
-          term("groupBy", "c, d"),
-          term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)),
-          term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
-            "start('w$) AS w$start, end('w$) AS w$end")
-        ),
-        term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, sumA, minB")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testWindowEndOnly(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT " +
-        "  TUMBLE_END(ts, INTERVAL '4' MINUTE)" +
-        "FROM T " +
-        "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetWindowAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "ts, c")
-          ),
-          term("groupBy", "c"),
-          term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
-          term("select", "c, start('w$) AS w$start, end('w$) AS w$end")
-        ),
-        term("select", "CAST(w$end) AS w$end")
-      )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/AggregationsValidationTest.scala
deleted file mode 100644
index 20d5970..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/AggregationsValidationTest.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class AggregationsValidationTest extends TableTestBase {
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
-
-    util.addFunction("overAgg", new OverAgg0)
-
-    val sqlQuery = "SELECT overAgg(b, a) FROM T"
-    util.tableEnv.sql(sqlQuery)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/CalcValidationTest.scala
deleted file mode 100644
index 7d2de19..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/CalcValidationTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class CalcValidationTest extends TableTestBase{
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidFields(): Unit = {
-
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, foo FROM MyTable"
-
-    util.tableEnv.sql(sqlQuery)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/JoinValidationTest.scala
deleted file mode 100644
index f4c475b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/JoinValidationTest.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class JoinValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
-
-    util.tableEnv.sql(sqlQuery)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testJoinNonMatchingKeyTypes(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
-
-    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testJoinNoEqualityPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testCrossJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, String)]("Table4", 'a1, 'b1, 'c1)
-
-    val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRightOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testLeftOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testFullOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/SortValidationTest.scala
deleted file mode 100644
index 61cbfd8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/SortValidationTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class SortValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[TableException])
-  def testLimitWithoutOrder(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
-
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/WindowAggregateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/WindowAggregateValidationTest.scala
deleted file mode 100644
index 20fdbba..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/validation/WindowAggregateValidationTest.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql.validation
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class WindowAggregateValidationTest extends TableTestBase {
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-    util.addFunction("overAgg", new OverAgg0)
-
-    val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
-
-    util.tableEnv.sql(sqlQuery)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testHopWindowNoOffset(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM T " +
-        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    util.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testSessionWindowNoOffset(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sqlQuery =
-      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
-        "FROM T " +
-        "GROUP BY SESSION(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
-
-    util.verifySql(sqlQuery, "n/a")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testVariableWindowSize() = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val sql = "SELECT COUNT(*) " +
-      "FROM T " +
-      "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)"
-    util.verifySql(sql, "n/a")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testTumbleWindowWithInvalidUdAggArgs() = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
-
-    val weightedAvg = new WeightedAvgWithMerge
-    util.tableEnv.registerFunction("weightedAvg", weightedAvg)
-
-    val sql = "SELECT weightedAvg(c, a) AS wAvg " +
-      "FROM T " +
-      "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
-    util.verifySql(sql, "n/a")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
deleted file mode 100644
index 6ca607b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for testing aggregate plans.
-  */
-class AggregationsTest extends TableTestBase {
-
-  @Test
-  def testGroupAggregateWithFilter(): Unit = {
-
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.groupBy('a)
-      .select('a, 'a.avg, 'b.sum, 'c.count)
-      .where('a === 1)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      calcNode,
-      term("groupBy", "a"),
-      term("select",
-        "a",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable,expected)
-  }
-
-  @Test
-  def testAggregate(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testAggregateWithFilter(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
-      .select('a.avg,'b.sum,'c.count)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      // ReduceExpressionsRule will add cast for Project node by force
-      // if the input of the Project node has constant expression.
-      term("select", "CAST(1) AS a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-      "DataSetValues",
-      calcNode,
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
deleted file mode 100644
index ece9acc..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-
-class CompositeFlatteningTest extends TableTestBase {
-
-  @Test
-  def testMultipleFlatteningsTable(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table.select('a.flatten(), 'c, 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "c",
-        "b._1 AS b$_1",
-        "b._2 AS b$_2"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testNestedFlattening(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
-
-    val result = table.select('a.flatten(), 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "b"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testScalarFunctionAccess(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[(String, Int)]("MyTable", 'a, 'b)
-
-    val result = table.select(
-      giveMeCaseClass().get("my"),
-      giveMeCaseClass().get("clazz"),
-      giveMeCaseClass().flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}
-
-object CompositeFlatteningTest {
-
-  case class TestCaseClass(my: String, clazz: Int)
-
-  object giveMeCaseClass extends ScalarFunction {
-    def eval(): TestCaseClass = {
-      TestCaseClass("hello", 42)
-    }
-
-    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-      createTypeInformation[TestCaseClass]
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
deleted file mode 100644
index 4a38741..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainTest
-  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
-
-  val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilterWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testFilterWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
deleted file mode 100644
index c35a454..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}


Mime
View raw message