http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
deleted file mode 100644
index a174c8b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._
-import org.apache.flink.table.expressions.Upper
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{TableTestBase, _}
-import org.junit.Test
-
-/**
- * Tests for all the situations when we can do fields projection. Like selecting few fields
- * from a large field count source.
- */
-class FieldProjectionTest extends TableTestBase {
-
- @Test
- def testSimpleSelect(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.select('a, 'b)
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAllFields(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable1 = sourceTable.select('*)
- val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
-
- val expected = batchTableNode(0)
-
- util.verifyTable(resultTable1, expected)
- util.verifyTable(resultTable2, expected)
- }
-
- @Test
- def testSelectAggregation(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.select('a.sum, 'b.max)
-
- val expected = unaryNode(
- "DataSetAggregate",
- binaryNode(
- "DataSetUnion",
- values(
- "DataSetValues",
- tuples(List(null, null)),
- term("values", "a", "b")
- ),
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b")
- ),
- term("union", "a", "b")
- ),
- term("select", "SUM(a) AS TMP_0", "MAX(b) AS TMP_1")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFunction(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
-
- util.tableEnv.registerFunction("hashCode", MyHashCode)
-
- val resultTable = sourceTable.select("hashCode(c), b")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTable(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('a, 'c).select('a)
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetDistinct",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("distinct", "a", "c")
- ),
- term("select", "a")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAllFieldsFromGroupedTable(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
-
- val expected = unaryNode(
- "DataSetDistinct",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("distinct", "a", "c")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectAggregationFromGroupedTable(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy('c).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c")
- ),
- term("groupBy", "c"),
- term("select", "c", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c", "UPPER(c) AS k")
- ),
- term("groupBy", "k"),
- term("select", "k", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromGroupedTableWithFunctionKey(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
- val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
-
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
- ),
- term("groupBy", "k"),
- term("select", "k", "SUM(a) AS TMP_0")
- ),
- term("select", "TMP_0")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testSelectFromAggregatedPojoTable(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
- val resultTable = sourceTable
- .groupBy('word)
- .select('word, 'frequency.sum as 'frequency)
- .filter('frequency === 2)
- val expected =
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- batchTableNode(0),
- term("groupBy", "word"),
- term("select", "word", "SUM(frequency) AS TMP_0")
- ),
- term("select", "word, TMP_0 AS frequency"),
- term("where", "=(TMP_0, 2)")
- )
-
- util.verifyTable(resultTable, expected)
- }
-
-}
-
-object FieldProjectionTest {
-
- object MyHashCode extends ScalarFunction {
- def eval(s: String): Int = s.hashCode()
- }
-
- case class WC(word: String, frequency: Long)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
deleted file mode 100644
index 84a6738..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.WindowReference
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class GroupWindowTest extends TableTestBase {
-
- //===============================================================================================
- // Common test
- //===============================================================================================
-
- @Test
- def testEventTimeTumblingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeTumblingGroupWindowOverTimeWithUdAgg(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, myWeightedAvg('long, 'int))
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
- term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeTumblingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('w)
- .select('int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "int", "long")
- ),
- term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows on 'long as 'w)
- .groupBy('w)
- .select('int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "int", "long")
- ),
- term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- //===============================================================================================
- // Sliding Windows
- //===============================================================================================
-
- @Test
- def testEventTimeSlidingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 8.milli every 10.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window",
- SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window",
- SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTimeWithUdAgg(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- val windowedTable = table
- .window(Slide over 8.milli every 10.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, myWeightedAvg('long, 'int))
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window",
- SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
- term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 8.milli every 10.milli on 'long as 'w)
- .groupBy('w)
- .select('int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "int", "long")
- ),
- term("window",
- SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows on 'long as 'w)
- .groupBy('w)
- .select('int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "int", "long")
- ),
- term("window",
- SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- //===============================================================================================
- // Session Windows
- //===============================================================================================
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Session withGap 7.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTimeWithUdAgg(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- val windowedTable = table
- .window(Session withGap 7.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, myWeightedAvg('long, 'int))
-
- val expected = unaryNode(
- "DataSetWindowAggregate",
- batchTableNode(0),
- term("groupBy", "string"),
- term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
- term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 079b10a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testCrossJoin(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result1 = table.join(function('c) as 's).select('c, 's)
-
- val expected1 = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"${function.functionIdentifier}($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result1, expected1)
-
- // test overloading
-
- val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
- val expected2 = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result2, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"${function.functionIdentifier}($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
deleted file mode 100644
index b085160..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class AggregationsStringExpressionTest extends TableTestBase {
-
- @Test
- def testAggregationTypes(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
- val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testWorkingAggregationDataTypes(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
-
- val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
- val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testProjection(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Byte, Short)]("Table2")
-
- val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
- val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testAggregationWithArithmetic(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Long, String)]("Table2")
-
- val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
- val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testAggregationWithTwoCount(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Long, String)]("Table2")
-
- val t1 = t.select('_1.count, '_2.count)
- val t2 = t.select("_1.count, _2.count")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testAggregationAfterProjection(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
-
- val t1 = t.select('_1, '_2, '_3)
- .select('_1.avg, '_2.sum, '_3.count)
-
- val t2 = t.select("_1, _2, _3")
- .select("_1.avg, _2.sum, _3.count")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testDistinct(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val distinct = ds.select('b).distinct()
- val distinct2 = ds.select("b").distinct()
-
- verifyTableEquals(distinct, distinct2)
- }
-
- @Test
- def testDistinctAfterAggregate(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
- val distinct = ds.groupBy('a, 'e).select('e).distinct()
- val distinct2 = ds.groupBy("a, e").select("e").distinct()
-
- val lPlan1 = distinct.logicalPlan
- val lPlan2 = distinct2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testGroupedAggregate(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t.groupBy('b).select('b, 'a.sum)
- val t2 = t.groupBy("b").select("b, a.sum")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupingKeyForwardIfNotUsed(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t.groupBy('b).select('a.sum)
- val t2 = t.groupBy("b").select("a.sum")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupNoAggregation(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t
- .groupBy('b)
- .select('a.sum as 'd, 'b)
- .groupBy('b, 'd)
- .select('b)
-
- val t2 = t
- .groupBy("b")
- .select("a.sum as d, b")
- .groupBy("b, d")
- .select("b")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupedAggregateWithConstant1(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t.select('a, 4 as 'four, 'b)
- .groupBy('four, 'a)
- .select('four, 'b.sum)
-
- val t2 = t.select("a, 4 as four, b")
- .groupBy("four, a")
- .select("four, b.sum")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupedAggregateWithConstant2(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t.select('b, 4 as 'four, 'a)
- .groupBy('b, 'four)
- .select('four, 'a.sum)
- val t2 = t.select("b, 4 as four, a")
- .groupBy("b, four")
- .select("four, a.sum")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupedAggregateWithExpression(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
- val t1 = t.groupBy('e, 'b % 3)
- .select('c.min, 'e, 'a.avg, 'd.count)
- val t2 = t.groupBy("e, b % 3")
- .select("c.min, e, a.avg, d.count")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testGroupedAggregateWithFilter(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t1 = t.groupBy('b)
- .select('b, 'a.sum)
- .where('b === 2)
- val t2 = t.groupBy("b")
- .select("b, a.sum")
- .where("b = 2")
-
- verifyTableEquals(t1, t2)
- }
-
- @Test
- def testAnalyticAggregation(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, Float, Double)]('_1, '_2, '_3, '_4)
-
- val resScala = t.select(
- '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop,
- '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp,
- '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop,
- '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp)
- val resJava = t.select("""
- _1.stddevPop, _2.stddevPop, _3.stddevPop, _4.stddevPop,
- _1.stddevSamp, _2.stddevSamp, _3.stddevSamp, _4.stddevSamp,
- _1.varPop, _2.varPop, _3.varPop, _4.varPop,
- _1.varSamp, _2.varSamp, _3.varSamp, _4.varSamp""")
-
- verifyTableEquals(resScala, resJava)
- }
-
- @Test
- def testAggregateWithUDAGG(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myCnt = new CountAggFunction
- util.tableEnv.registerFunction("myCnt", myCnt)
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
- util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
- val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
- val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- val x = LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString)
- val y = LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString)
-
- Assert.assertEquals(
- "Logical Plans do not match",
- LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
- LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
- }
-
- @Test
- def testGroupedAggregateWithUDAGG(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-
- val myCnt = new CountAggFunction
- util.tableEnv.registerFunction("myCnt", myCnt)
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
- util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
- val t1 = t.groupBy('b)
- .select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))
- val t2 = t.groupBy("b")
- .select("b, myCnt(a) + 9 as aCnt, myWeightedAvg(b, a) * 2 as wAvg, myWeightedAvg(a, a)")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals(
- "Logical Plans do not match",
- LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
- LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
deleted file mode 100644
index 9736ec1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class CalcStringExpressionTest extends TableTestBase {
-
- @Test
- def testSimpleSelectAllWithAs(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = t.select('a, 'b, 'c)
- val t2 = t.select("a, b, c")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testSimpleSelectWithNaming(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t
- .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
- .select('a, 'b)
-
- val t2 = t
- .select("_1 as a, _2 as b, _1 as c")
- .select("a, b")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testSimpleSelectRenameAll(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t
- .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
- .select('a, 'b)
-
- val t2 = t
- .select("_1 as a, _2 as b, _3 as c")
- .select("a, b")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testSelectStar(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = t.select('*)
- val t2 = t.select("*")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testAllRejectingFilter(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( Literal(false) )
- val t2 = ds.filter("faLsE")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( Literal(true) )
- val t2 = ds.filter("trUe")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testFilterOnStringTupleField(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( 'c.like("%world%") )
- val t2 = ds.filter("c.like('%world%')")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testFilterOnIntegerTupleField(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( 'a % 2 === 0 )
- val t2 = ds.filter( "a % 2 = 0 ")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testNotEquals(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( 'a % 2 !== 0 )
- val t2 = ds.filter("a % 2 <> 0")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testDisjunctivePredicate(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter( 'a < 2 || 'a > 20)
- val t2 = ds.filter("a < 2 || a > 20")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testConsecutiveFilters(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
- val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testFilterBasicType(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[String]("Table3",'a)
-
- val t1 = ds.filter( 'a.like("H%") )
- val t2 = ds.filter( "a.like('H%')" )
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testFilterOnCustomType(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
-
- val t1 = t.filter( 's.like("%a%") )
- val t2 = t.filter("s.like('%a%')")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testSimpleCalc(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t.select('_1, '_2, '_3)
- .where('_1 < 7)
- .select('_1, '_3)
-
- val t2 = t.select("_1, _2, _3")
- .where("_1 < 7")
- .select("_1, _3")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testCalcWithTwoFilters(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t.select('_1, '_2, '_3)
- .where('_1 < 7 && '_2 === 3)
- .select('_1, '_3)
- .where('_1 === 4)
- .select('_1)
-
- val t2 = t.select("_1, _2, _3")
- .where("_1 < 7 && _2 = 3")
- .select("_1, _3")
- .where("_1 === 4")
- .select("_1")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testCalcWithAggregation(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3")
-
- val t1 = t.select('_1, '_2, '_3)
- .where('_1 < 15)
- .groupBy('_2)
- .select('_1.min, '_2.count as 'cnt)
- .where('cnt > 3)
-
-
- val t2 = t.select("_1, _2, _3")
- .where("_1 < 15")
- .groupBy("_2")
- .select("_1.min, _2.count as cnt")
- .where("cnt > 3")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match",
- LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
- LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
- }
-
- @Test
- def testCalcJoin(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
- .where('b > 1).select('a, 'd).where('d === 2)
- val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
- .where("b > 1").select("a, d").where("d = 2")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testAdvancedDataTypes(): Unit = {
- val util = batchTestUtil()
- val t = util
- .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
-
- val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
- "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
- "1984-07-12 14:34:24".cast(Types.SQL_TIMESTAMP))
- val t2 = t.select("a, b, c, d, e, 11.2, 11.2," +
- "'1984-07-12'.toDate, '14:34:24'.toTime," +
- "'1984-07-12 14:34:24'.toTimestamp")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1.toString, lPlan2.toString)
- }
-
- @Test
- def testIntegerBiggerThan128(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val t1 = t.filter('a === 300)
- val t2 = t.filter("a = 300")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
deleted file mode 100644
index 8ca5745..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.Types._
-import org.apache.flink.table.api.scala._
-import org.junit._
-
-class CastingStringExpressionTest {
-
- @Test
- def testNumericAutoCastInArithmetic() {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
-
- val table = env.fromElements(
- (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
- val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
- '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
- val t2 = table.select("_1 + 1, _2 +" +
- " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- @throws[Exception]
- def testNumericAutoCastInComparison() {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
-
- val table = env.fromElements(
- (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
- (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
- .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
- val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
- 'd > 1.0f && 'e > 1.0d && 'f > 1)
- val t2 = table
- .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- @throws[Exception]
- def testCasting() {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
- val t1 = table .select(
- // * -> String
- '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
- // NUMERIC TYPE -> Boolean
- '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
- // NUMERIC TYPE -> NUMERIC TYPE
- '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
- // Boolean -> NUMERIC TYPE
- '_4.cast(DOUBLE), // identity casting
- '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
- val t2 = table.select(
- // * -> String
- "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
- // NUMERIC TYPE -> Boolean
- "_1.cast(BOOLEAN), _2.cast(BOOLEAN), _3.cast(BOOLEAN)," +
- // NUMERIC TYPE -> NUMERIC TYPE
- "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
- // Boolean -> NUMERIC TYPE
- "_4.cast(DOUBLE)," +
- // identity casting
- "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOLEAN)")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- @throws[Exception]
- def testCastFromString() {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
- val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
- '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
- val t2 = table.select(
- "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
- "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOLEAN)")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
deleted file mode 100644
index 067441d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class JoinStringExpressionTest extends TableTestBase {
-
- @Test
- def testJoin(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
- val t1Java = ds1.join(ds2).where("b === e").select("c, g")
-
- val lPlan1 = t1Scala.logicalPlan
- val lPlan2 = t1Java.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithFilter(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
- val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
-
- val lPlan1 = t1Scala.logicalPlan
- val lPlan2 = t1Java.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithJoinFilter(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
- val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
-
- val lPlan1 = t1Scala.logicalPlan
- val lPlan2 = t1Java.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithMultipleKeys(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
- val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
-
- val lPlan1 = t1Scala.logicalPlan
- val lPlan2 = t1Java.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithAggregation(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
- val t1Java = ds1.join(ds2).where("a === d").select("g.count")
-
- val lPlan1 = t1Scala.logicalPlan
- val lPlan2 = t1Java.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match",
- LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
- LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
- }
-
- @Test
- def testJoinWithGroupedAggregation(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.join(ds2)
- .where('a === 'd)
- .groupBy('a, 'd)
- .select('b.sum, 'g.count)
- val t2 = ds1.join(ds2)
- .where("a = d")
- .groupBy("a, d")
- .select("b.sum, g.count")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match",
- LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
- LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
- }
-
- @Test
- def testJoinPushThroughJoin(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds3 = util.addTable[(Int, Long, String)]("Table4",'j, 'k, 'l)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.join(ds2)
- .where(Literal(true))
- .join(ds3)
- .where('a === 'd && 'e === 'k)
- .select('a, 'f, 'l)
- val t2 = ds1.join(ds2)
- .where("true")
- .join(ds3)
- .where("a === d && e === k")
- .select("a, f, l")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithDisjunctivePred(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
- val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testJoinWithExpressionPreds(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
- val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testLeftJoinWithMultipleKeys(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
- val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testRightJoinWithMultipleKeys(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
- val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
- def testFullOuterJoinWithMultipleKeys(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
- val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
deleted file mode 100644
index 8cf9979..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.stringexpr
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{Table, Types}
-import org.junit.Test
-
-class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
-
- @Test
- def testJoin(): Unit = {
- val util = batchTestUtil()
-
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
- val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
-
- // test cross join
- val func1 = new TableFunc1
- util.javaTableEnv.registerFunction("func1", func1)
- var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
- var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test left outer join
- scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
- javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- util.javaTableEnv.registerFunction("func2", func2)
- scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
- javaTable = jTab.join(
- new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- util.javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
- javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- util.javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
- javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
- javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
- javaTable = jTab.join(
- new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
deleted file mode 100644
index 25ecd96..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMergeAndReset}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class AggregationsValidationTest extends TableTestBase {
-
- /**
- * OVER clause is necessary for [[OverAgg0]] window function.
- */
- @Test(expected = classOf[ValidationException])
- def testOverAggregation(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- val overAgg = new OverAgg0
- t.select('c.count, overAgg('b, 'a))
- }
-
- @Test(expected = classOf[ValidationException])
- def testNonWorkingAggregationDataTypes(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- env.fromElements(("Hello", 1)).toTable(tEnv)
- // Must fail. Field '_1 is not a numeric type.
- .select('_1.sum)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoNestedAggregations(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(String, Int)]("Table2")
-
- // Must fail. Sum aggregation can not be chained.
- t.select('_2.sum.sum)
- }
-
- @Test(expected = classOf[ValidationException])
- def testGroupingOnNonExistentField(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- // must fail. '_foo not a valid field
- t.groupBy('_foo)
- .select('a.avg)
- }
-
- @Test(expected = classOf[ValidationException])
- def testGroupingInvalidSelection(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- t.groupBy('a, 'b)
- // must fail. 'c is not a grouping key or aggregation
- .select('c)
- }
-
- @Test(expected = classOf[ValidationException])
- def testAggregationOnNonExistingField(): Unit = {
-
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- // Must fail. Field 'foo does not exist.
- t.select('foo.avg)
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testInvalidUdAggArgs() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
- // must fail. UDAGG does not accept String type
- t.select(myWeightedAvg('c, 'a))
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingInvalidUdAggArgs() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
- t.groupBy('b)
- // must fail. UDAGG does not accept String type
- .select(myWeightedAvg('c, 'a))
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingNestedUdAgg() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
-
- t.groupBy('c)
- // must fail. UDAGG does not accept String type
- .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testAggregationOnNonExistingFieldJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- t.select("foo.avg")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testNonWorkingAggregationDataTypesJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Long, String)]("Table2",'b, 'c)
- // Must fail. Cannot compute SUM aggregate on String field.
- t.select("c.sum")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testNoNestedAggregationsJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Long, String)]("Table2",'b, 'c)
- // Must fail. Aggregation on aggregation not allowed.
- t.select("b.sum.sum")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingOnNonExistentFieldJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- // must fail. Field foo is not in input
- t.groupBy("foo")
- .select("a.avg")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingInvalidSelectionJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- t.groupBy("a, b")
- // must fail. Field c is not a grouping key or aggregation
- .select("c")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testUnknownUdAggJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- // must fail. unknown is not known
- t.select("unknown(c)")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingUnknownUdAggJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- t.groupBy("a, b")
- // must fail. unknown is not known
- .select("unknown(c)")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testInvalidUdAggArgsJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
- util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
- // must fail. UDAGG does not accept String type
- t.select("myWeightedAvg(c, a)")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testGroupingInvalidUdAggArgsJava() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val myWeightedAvg = new WeightedAvgWithMergeAndReset
- util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
-
- t.groupBy("b")
- // must fail. UDAGG does not accept String type
- .select("myWeightedAvg(c, a)")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
deleted file mode 100644
index e2a5dac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-class CalcValidationTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testSelectInvalidFieldFields(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- // must fail. Field 'foo does not exist
- .select('a, 'foo)
- }
-
- @Test(expected = classOf[ValidationException])
- def testSelectAmbiguousRenaming(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- // must fail. 'a and 'b are both renamed to 'foo
- .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
- }
-
- @Test(expected = classOf[ValidationException])
- def testSelectAmbiguousRenaming2(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- // must fail. 'a and 'b are both renamed to 'a
- .select('a, 'b as 'a).toDataSet[Row].print()
- }
-
- @Test(expected = classOf[ValidationException])
- def testFilterInvalidFieldName(): Unit = {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- // must fail. Field 'foo does not exist
- t.filter( 'foo === 2 )
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testSelectInvalidField() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- // Must fail. Field foo does not exist
- t.select("a + 1, foo + 2")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testSelectAmbiguousFieldNames() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- // Must fail. Field foo does not exist
- t.select("a + 1 as foo, b + 2 as foo")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testFilterInvalidField() {
- val util = batchTestUtil()
- val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-
- // Must fail. Field foo does not exist.
- t.filter("foo = 17")
- }
-
- @Test
- def testAliasStarException(): Unit = {
- val util = batchTestUtil()
-
- try {
- util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
- fail("TableException expected")
- } catch {
- case _: TableException => //ignore
- }
-
- try {
- util.addTable[(Int, Long, String)]("Table2")
- .select('_1 as '*, '_2 as 'b, '_1 as 'c)
- fail("ValidationException expected")
- } catch {
- case _: ValidationException => //ignore
- }
-
- try {
- util.addTable[(Int, Long, String)]("Table3").as('*, 'b, 'c)
- fail("ValidationException expected")
- } catch {
- case _: ValidationException => //ignore
- }
- try {
- util.addTable[(Int, Long, String)]("Table4", 'a, 'b, 'c).select('*, 'b)
- fail("ValidationException expected")
- } catch {
- case _: ValidationException => //ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
deleted file mode 100644
index 0ac205a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.junit.Test
-
-class CompositeFlatteningValidationTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testDuplicateFlattening(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
- table.select('a.flatten(), 'a.flatten())
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
deleted file mode 100644
index b8e2b97..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class GroupWindowValidationTest extends TableTestBase {
-
- //===============================================================================================
- // Common test
- //===============================================================================================
-
- /**
- * OVER clause is necessary for [[OverAgg0]] window function.
- */
- @Test(expected = classOf[ValidationException])
- def testOverAggregation(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
- val overAgg = new OverAgg0
- table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('string,'w)
- .select(overAgg('long, 'int))
- }
-
- @Test(expected = classOf[ValidationException])
- def testGroupByWithoutWindowAlias(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('string)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidRowTimeRef(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Tumble over 5.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count)
- .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int does not exist in input.
- .groupBy('w2)
- .select('string)
- }
-
- //===============================================================================================
- // Tumbling Windows
- //===============================================================================================
-
- @Test(expected = classOf[ValidationException])
- def testInvalidProcessingTimeDefinition(): Unit = {
- val util = batchTestUtil()
- // proctime is not allowed
- util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidProcessingTimeDefinition2(): Unit = {
- val util = batchTestUtil()
- // proctime is not allowed
- util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidEventTimeDefinition(): Unit = {
- val util = batchTestUtil()
- // definition must not extend schema
- util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
- }
-
- @Test(expected = classOf[ValidationException])
- def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Tumble over 2.minutes on 'rowtime as 'w)
- .groupBy('w, 'long)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
- @Test(expected = classOf[ValidationException])
- def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Tumble over 2.minutes on 'rowtime as 'w)
- .groupBy('w)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
- //===============================================================================================
- // Sliding Windows
- //===============================================================================================
-
- @Test(expected = classOf[ValidationException])
- def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
- .groupBy('w, 'long)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
- @Test(expected = classOf[ValidationException])
- def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Slide over 2.minutes every 1.minute on 'long as 'w)
- .groupBy('w)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
- @Test(expected = classOf[ValidationException])
- def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Session withGap 2.minutes on 'rowtime as 'w)
- .groupBy('w, 'long)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
- @Test(expected = classOf[ValidationException])
- def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val myWeightedAvg = new WeightedAvgWithMerge
-
- table
- .window(Session withGap 2.minutes on 'rowtime as 'w)
- .groupBy('w)
- // invalid function arguments
- .select(myWeightedAvg('int, 'string))
- }
-
-}
|