flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [38/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
deleted file mode 100644
index a174c8b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._
-import org.apache.flink.table.expressions.Upper
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{TableTestBase, _}
-import org.junit.Test
-
-/**
-  * Tests for all the situations when we can do fields projection. Like selecting few fields
-  * from a large field count source.
-  */
-class FieldProjectionTest extends TableTestBase {
-
-  @Test
-  def testSimpleSelect(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.select('a, 'b)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectAllFields(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable1 = sourceTable.select('*)
-    val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
-
-    val expected = batchTableNode(0)
-
-    util.verifyTable(resultTable1, expected)
-    util.verifyTable(resultTable2, expected)
-  }
-
-  @Test
-  def testSelectAggregation(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.select('a.sum, 'b.max)
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      binaryNode(
-        "DataSetUnion",
-        values(
-          "DataSetValues",
-          tuples(List(null, null)),
-          term("values", "a", "b")
-        ),
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a", "b")
-        ),
-        term("union", "a", "b")
-      ),
-      term("select", "SUM(a) AS TMP_0", "MAX(b) AS TMP_1")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFunction(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-
-    util.tableEnv.registerFunction("hashCode", MyHashCode)
-
-    val resultTable = sourceTable.select("hashCode(c), b")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromGroupedTable(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.groupBy('a, 'c).select('a)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetDistinct",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a", "c")
-        ),
-        term("distinct", "a", "c")
-      ),
-      term("select", "a")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectAllFieldsFromGroupedTable(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
-
-    val expected = unaryNode(
-      "DataSetDistinct",
-      unaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        term("select", "a", "c")
-      ),
-      term("distinct", "a", "c")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectAggregationFromGroupedTable(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.groupBy('c).select('a.sum)
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "a", "c")
-          ),
-          term("groupBy", "c"),
-          term("select", "c", "SUM(a) AS TMP_0")
-        ),
-        term("select", "TMP_0")
-      )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "a", "c", "UPPER(c) AS k")
-          ),
-          term("groupBy", "k"),
-          term("select", "k", "SUM(a) AS TMP_0")
-        ),
-        term("select", "TMP_0")
-      )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromGroupedTableWithFunctionKey(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-    val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
-          ),
-          term("groupBy", "k"),
-          term("select", "k", "SUM(a) AS TMP_0")
-        ),
-        term("select", "TMP_0")
-      )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromAggregatedPojoTable(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
-    val resultTable = sourceTable
-      .groupBy('word)
-      .select('word, 'frequency.sum as 'frequency)
-      .filter('frequency === 2)
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetAggregate",
-          batchTableNode(0),
-          term("groupBy", "word"),
-          term("select", "word", "SUM(frequency) AS TMP_0")
-        ),
-        term("select", "word, TMP_0 AS frequency"),
-        term("where", "=(TMP_0, 2)")
-      )
-
-    util.verifyTable(resultTable, expected)
-  }
-
-}
-
-object FieldProjectionTest {
-
-  object MyHashCode extends ScalarFunction {
-    def eval(s: String): Int = s.hashCode()
-  }
-
-  case class WC(word: String, frequency: Long)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
deleted file mode 100644
index 84a6738..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.WindowReference
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class GroupWindowTest extends TableTestBase {
-
-  //===============================================================================================
-  // Common test
-  //===============================================================================================
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTimeWithUdAgg(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, myWeightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      unaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      unaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  //===============================================================================================
-  // Sliding Windows
-  //===============================================================================================
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window",
-        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window",
-        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeWithUdAgg(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, myWeightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window",
-           SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      unaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window",
-        SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      unaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        term("select", "int", "long")
-      ),
-      term("window",
-        SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  //===============================================================================================
-  // Session Windows
-  //===============================================================================================
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTimeWithUdAgg(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, myWeightedAvg('long, 'int))
-
-    val expected = unaryNode(
-      "DataSetWindowAggregate",
-      batchTableNode(0),
-      term("groupBy", "string"),
-      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 079b10a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result1 = table.join(function('c) as 's).select('c, 's)
-
-    val expected1 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result1, expected1)
-
-    // test overloading
-
-    val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
deleted file mode 100644
index b085160..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class AggregationsStringExpressionTest extends TableTestBase {
-
-  @Test
-  def testAggregationTypes(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
-    val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
-
-    val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-    val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testProjection(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Byte, Short)]("Table2")
-
-    val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
-    val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testAggregationWithArithmetic(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Long, String)]("Table2")
-
-    val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
-    val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testAggregationWithTwoCount(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Long, String)]("Table2")
-
-    val t1 = t.select('_1.count, '_2.count)
-    val t2 = t.select("_1.count, _2.count")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testAggregationAfterProjection(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
-
-    val t1 = t.select('_1, '_2, '_3)
-      .select('_1.avg, '_2.sum, '_3.count)
-
-    val t2 = t.select("_1, _2, _3")
-      .select("_1.avg, _2.sum, _3.count")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testDistinct(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val distinct = ds.select('b).distinct()
-    val distinct2 = ds.select("b").distinct()
-
-    verifyTableEquals(distinct, distinct2)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
-    val distinct = ds.groupBy('a, 'e).select('e).distinct()
-    val distinct2 = ds.groupBy("a, e").select("e").distinct()
-
-    val lPlan1 = distinct.logicalPlan
-    val lPlan2 = distinct2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t.groupBy('b).select('b, 'a.sum)
-    val t2 = t.groupBy("b").select("b, a.sum")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t.groupBy('b).select('a.sum)
-    val t2 = t.groupBy("b").select("a.sum")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val t2 = t
-      .groupBy("b")
-      .select("a.sum as d, b")
-      .groupBy("b, d")
-      .select("b")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant1(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t.select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val t2 = t.select("a, 4 as four, b")
-      .groupBy("four, a")
-      .select("four, b.sum")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant2(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t.select('b, 4 as 'four, 'a)
-      .groupBy('b, 'four)
-      .select('four, 'a.sum)
-    val t2 = t.select("b, 4 as four, a")
-      .groupBy("b, four")
-      .select("four, a.sum")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupedAggregateWithExpression(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
-    val t1 = t.groupBy('e, 'b % 3)
-      .select('c.min, 'e, 'a.avg, 'd.count)
-    val t2 = t.groupBy("e, b % 3")
-      .select("c.min, e, a.avg, d.count")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testGroupedAggregateWithFilter(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t1 = t.groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-    val t2 = t.groupBy("b")
-      .select("b, a.sum")
-      .where("b = 2")
-
-    verifyTableEquals(t1, t2)
-  }
-
-  @Test
-  def testAnalyticAggregation(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, Float, Double)]('_1, '_2, '_3, '_4)
-
-    val resScala = t.select(
-      '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop,
-      '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp,
-      '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop,
-      '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp)
-    val resJava = t.select("""
-      _1.stddevPop, _2.stddevPop, _3.stddevPop, _4.stddevPop,
-      _1.stddevSamp, _2.stddevSamp, _3.stddevSamp, _4.stddevSamp,
-      _1.varPop, _2.varPop, _3.varPop, _4.varPop,
-      _1.varSamp, _2.varSamp, _3.varSamp, _4.varSamp""")
-
-    verifyTableEquals(resScala, resJava)
-  }
-
-  @Test
-  def testAggregateWithUDAGG(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myCnt = new CountAggFunction
-    util.tableEnv.registerFunction("myCnt", myCnt)
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
-    val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
-    val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    val x = LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString)
-    val y = LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)
-
-    Assert.assertEquals(
-      "Logical Plans do not match",
-      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
-      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
-  }
-
-  @Test
-  def testGroupedAggregateWithUDAGG(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-
-    val myCnt = new CountAggFunction
-    util.tableEnv.registerFunction("myCnt", myCnt)
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
-    val t1 = t.groupBy('b)
-      .select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))
-    val t2 = t.groupBy("b")
-      .select("b, myCnt(a) + 9 as aCnt, myWeightedAvg(b, a) * 2 as wAvg, myWeightedAvg(a, a)")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals(
-      "Logical Plans do not match",
-      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
-      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
deleted file mode 100644
index 9736ec1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class CalcStringExpressionTest extends TableTestBase {
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = t.select('a, 'b, 'c)
-    val t2 = t.select("a, b, c")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val t2 = t
-      .select("_1 as a, _2 as b, _1 as c")
-      .select("a, b")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val t2 = t
-      .select("_1 as a, _2 as b, _3 as c")
-      .select("a, b")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testSelectStar(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = t.select('*)
-    val t2 = t.select("*")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( Literal(false) )
-    val t2 = ds.filter("faLsE")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( Literal(true) )
-    val t2 = ds.filter("trUe")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( 'c.like("%world%") )
-    val t2 = ds.filter("c.like('%world%')")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( 'a % 2 === 0 )
-    val t2 = ds.filter( "a % 2 = 0 ")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( 'a % 2 !== 0 )
-    val t2 = ds.filter("a % 2 <> 0")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter( 'a < 2 || 'a > 20)
-    val t2 = ds.filter("a < 2 || a > 20")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testConsecutiveFilters(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
-    val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testFilterBasicType(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[String]("Table3",'a)
-
-    val t1 = ds.filter( 'a.like("H%") )
-    val t2 = ds.filter( "a.like('H%')" )
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
-
-    val t1 = t.filter( 's.like("%a%") )
-    val t2 = t.filter("s.like('%a%')")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testSimpleCalc(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t.select('_1, '_2, '_3)
-      .where('_1 < 7)
-      .select('_1, '_3)
-
-    val t2 = t.select("_1, _2, _3")
-      .where("_1 < 7")
-      .select("_1, _3")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testCalcWithTwoFilters(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t.select('_1, '_2, '_3)
-      .where('_1 < 7 && '_2 === 3)
-      .select('_1, '_3)
-      .where('_1 === 4)
-      .select('_1)
-
-    val t2 = t.select("_1, _2, _3")
-      .where("_1 < 7 && _2 = 3")
-      .select("_1, _3")
-      .where("_1 === 4")
-      .select("_1")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testCalcWithAggregation(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3")
-
-    val t1 = t.select('_1, '_2, '_3)
-      .where('_1 < 15)
-      .groupBy('_2)
-      .select('_1.min, '_2.count as 'cnt)
-      .where('cnt > 3)
-
-
-    val t2 = t.select("_1, _2, _3")
-      .where("_1 < 15")
-      .groupBy("_2")
-      .select("_1.min, _2.count as cnt")
-      .where("cnt > 3")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match",
-      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
-      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
-  }
-
-  @Test
-  def testCalcJoin(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
-      .where('b > 1).select('a, 'd).where('d === 2)
-    val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
-      .where("b > 1").select("a, d").where("d = 2")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val util = batchTestUtil()
-    val t = util
-      .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
-    val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
-        "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
-        "1984-07-12 14:34:24".cast(Types.SQL_TIMESTAMP))
-    val t2 = t.select("a, b, c, d, e, 11.2, 11.2," +
-      "'1984-07-12'.toDate, '14:34:24'.toTime," +
-      "'1984-07-12 14:34:24'.toTimestamp")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1.toString, lPlan2.toString)
-  }
-
-  @Test
-  def testIntegerBiggerThan128(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val t1 = t.filter('a === 300)
-    val t2 = t.filter("a = 300")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
deleted file mode 100644
index 8ca5745..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.Types._
-import org.apache.flink.table.api.scala._
-import org.junit._
-
-class CastingStringExpressionTest {
-
-  @Test
-  def testNumericAutoCastInArithmetic() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
-    val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
-      '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
-    val t2 = table.select("_1 + 1, _2 +" +
-      " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  @throws[Exception]
-  def testNumericAutoCastInComparison() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
-      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
-      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
-    val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
-      'd > 1.0f && 'e > 1.0d && 'f > 1)
-    val t2 = table
-      .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCasting() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
-    val t1 = table .select(
-      // * -> String
-      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
-      // NUMERIC TYPE -> Boolean
-      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
-      // NUMERIC TYPE -> NUMERIC TYPE
-      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
-      // Boolean -> NUMERIC TYPE
-      '_4.cast(DOUBLE), // identity casting
-      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
-    val t2 = table.select(
-      // * -> String
-      "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
-        // NUMERIC TYPE -> Boolean
-        "_1.cast(BOOLEAN), _2.cast(BOOLEAN), _3.cast(BOOLEAN)," +
-        // NUMERIC TYPE -> NUMERIC TYPE
-        "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
-        // Boolean -> NUMERIC TYPE
-        "_4.cast(DOUBLE)," +
-        // identity casting
-        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOLEAN)")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCastFromString() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
-    val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
-        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
-    val t2 = table.select(
-      "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
-        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOLEAN)")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
deleted file mode 100644
index 067441d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class JoinStringExpressionTest extends TableTestBase {
-
-  @Test
-  def testJoin(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
-    val t1Java = ds1.join(ds2).where("b === e").select("c, g")
-
-    val lPlan1 = t1Scala.logicalPlan
-    val lPlan2 = t1Java.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-    val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
-
-    val lPlan1 = t1Scala.logicalPlan
-    val lPlan2 = t1Java.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithJoinFilter(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-    val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
-
-    val lPlan1 = t1Scala.logicalPlan
-    val lPlan2 = t1Java.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-    val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
-
-    val lPlan1 = t1Scala.logicalPlan
-    val lPlan2 = t1Java.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
-    val t1Java = ds1.join(ds2).where("a === d").select("g.count")
-
-    val lPlan1 = t1Scala.logicalPlan
-    val lPlan2 = t1Java.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match",
-      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
-      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
-  }
-
-  @Test
-  def testJoinWithGroupedAggregation(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.join(ds2)
-      .where('a === 'd)
-      .groupBy('a, 'd)
-      .select('b.sum, 'g.count)
-    val t2 = ds1.join(ds2)
-      .where("a = d")
-      .groupBy("a, d")
-      .select("b.sum, g.count")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match",
-      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
-      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
-  }
-
-  @Test
-  def testJoinPushThroughJoin(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds3 = util.addTable[(Int, Long, String)]("Table4",'j, 'k, 'l)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.join(ds2)
-      .where(Literal(true))
-      .join(ds3)
-      .where('a === 'd && 'e === 'k)
-      .select('a, 'f, 'l)
-    val t2 = ds1.join(ds2)
-      .where("true")
-      .join(ds3)
-      .where("a === d && e === k")
-      .select("a, f, l")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithDisjunctivePred(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
-    val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testJoinWithExpressionPreds(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-    val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testLeftJoinWithMultipleKeys(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-    val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testRightJoinWithMultipleKeys(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-    val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-  @Test
-  def testFullOuterJoinWithMultipleKeys(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-    val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
-
-    val lPlan1 = t1.logicalPlan
-    val lPlan2 = t2.logicalPlan
-
-    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
deleted file mode 100644
index 8cf9979..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{Table, Types}
-import org.junit.Test
-
-class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
-
-  @Test
-  def testJoin(): Unit = {
-    val util = batchTestUtil()
-
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
-    val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
-
-    // test cross join
-    val func1 = new TableFunc1
-    util.javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
-    var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    util.javaTableEnv.registerFunction("func2", func2)
-    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    util.javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
deleted file mode 100644
index 25ecd96..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMergeAndReset}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class AggregationsValidationTest extends TableTestBase {
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    val overAgg = new OverAgg0
-    t.select('c.count, overAgg('b, 'a))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Field '_1 is not a numeric type.
-      .select('_1.sum)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoNestedAggregations(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(String, Int)]("Table2")
-
-    // Must fail. Sum aggregation can not be chained.
-    t.select('_2.sum.sum)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    // must fail. '_foo not a valid field
-    t.groupBy('_foo)
-    .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    t.groupBy('a, 'b)
-    // must fail. 'c is not a grouping key or aggregation
-    .select('c)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    // Must fail. Field 'foo does not exist.
-    t.select('foo.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testInvalidUdAggArgs() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
-    // must fail. UDAGG does not accept String type
-    t.select(myWeightedAvg('c, 'a))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingInvalidUdAggArgs() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
-    t.groupBy('b)
-    // must fail. UDAGG does not accept String type
-    .select(myWeightedAvg('c, 'a))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingNestedUdAgg() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
-    t.groupBy('c)
-    // must fail. UDAGG does not accept String type
-    .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testAggregationOnNonExistingFieldJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    t.select("foo.avg")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testNonWorkingAggregationDataTypesJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
-    // Must fail. Cannot compute SUM aggregate on String field.
-    t.select("c.sum")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testNoNestedAggregationsJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
-    // Must fail. Aggregation on aggregation not allowed.
-    t.select("b.sum.sum")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingOnNonExistentFieldJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    // must fail. Field foo is not in input
-    t.groupBy("foo")
-    .select("a.avg")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingInvalidSelectionJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    t.groupBy("a, b")
-    // must fail. Field c is not a grouping key or aggregation
-    .select("c")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testUnknownUdAggJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    // must fail. unknown is not known
-    t.select("unknown(c)")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingUnknownUdAggJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    t.groupBy("a, b")
-    // must fail. unknown is not known
-    .select("unknown(c)")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testInvalidUdAggArgsJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
-    // must fail. UDAGG does not accept String type
-    t.select("myWeightedAvg(c, a)")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testGroupingInvalidUdAggArgsJava() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
-    t.groupBy("b")
-    // must fail. UDAGG does not accept String type
-    .select("myWeightedAvg(c, a)")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
deleted file mode 100644
index e2a5dac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-class CalcValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFilterInvalidFieldName(): Unit = {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    t.filter( 'foo === 2 )
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testSelectInvalidField() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    // Must fail. Field foo does not exist
-    t.select("a + 1, foo + 2")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testSelectAmbiguousFieldNames() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    // Must fail. Field foo does not exist
-    t.select("a + 1 as foo, b + 2 as foo")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testFilterInvalidField() {
-    val util = batchTestUtil()
-    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
-    // Must fail. Field foo does not exist.
-    t.filter("foo = 17")
-  }
-
-  @Test
-  def testAliasStarException(): Unit = {
-    val util = batchTestUtil()
-
-    try {
-      util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
-      fail("TableException expected")
-    } catch {
-      case _: TableException => //ignore
-    }
-
-    try {
-      util.addTable[(Int, Long, String)]("Table2")
-      .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      util.addTable[(Int, Long, String)]("Table3").as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-    try {
-      util.addTable[(Int, Long, String)]("Table4", 'a, 'b, 'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
deleted file mode 100644
index 0ac205a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.junit.Test
-
-class CompositeFlatteningValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testDuplicateFlattening(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    table.select('a.flatten(), 'a.flatten())
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
deleted file mode 100644
index b8e2b97..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class GroupWindowValidationTest extends TableTestBase {
-
-  //===============================================================================================
-  // Common test
-  //===============================================================================================
-
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-    val overAgg = new OverAgg0
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('string,'w)
-      .select(overAgg('long, 'int))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupByWithoutWindowAlias(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowTimeRef(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
-      .groupBy('w2)
-      .select('string)
-  }
-
-  //===============================================================================================
-  // Tumbling Windows
-  //===============================================================================================
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidProcessingTimeDefinition(): Unit = {
-    val util = batchTestUtil()
-    // proctime is not allowed
-    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidProcessingTimeDefinition2(): Unit = {
-    val util = batchTestUtil()
-    // proctime is not allowed
-    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidEventTimeDefinition(): Unit = {
-    val util = batchTestUtil()
-    // definition must not extend schema
-    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Tumble over 2.minutes on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Tumble over 2.minutes on 'rowtime as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  //===============================================================================================
-  // Sliding Windows
-  //===============================================================================================
-
-  @Test(expected = classOf[ValidationException])
-  def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Slide over 2.minutes every 1.minute on 'long as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Session withGap 2.minutes on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Session withGap 2.minutes on 'rowtime as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-}


Mime
View raw message