flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [2/9] flink git commit: [FLINK-3847] Restructure flink-table test packages.
Date Sun, 01 May 2016 12:47:16 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
deleted file mode 100644
index 2442091..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    val expected = "Hi,Hallo\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithJoinFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
-      "I am fine.,Hallo Welt wie\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    val expected = "6"
-    val results = joinT.toDataSet[Row]collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithGroupedAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2)
-      .where('a === 'd)
-      .groupBy('a, 'd)
-      .select('b.sum, 'g.count)
-
-    val expected = "6,3\n" + "4,2\n" + "1,1"
-    val results = joinT.toDataSet[Row]collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinPushThroughJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
-
-    val joinT = ds1.join(ds2)
-      .where(Literal(true))
-      .join(ds3)
-      .where('a === 'd && 'e === 'k)
-      .select('a, 'f, 'l)
-
-    val expected = "2,1,Hello\n" + "2,1,Hello world\n" + "1,0,Hi"
-    val results = joinT.toDataSet[Row]collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithDisjunctivePred(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10 )).select('c, 'g)
-
-    val expected =
-      "Hi,Hallo\n" +
-        "Hello,Hallo Welt\n" +
-        "I am fine.,IJK"
-    val results = joinT.toDataSet[Row]collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithExpressionPreds(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-
-    val expected =
-        "I am fine.,Hallo Welt\n" +
-        "Luke Skywalker,Hallo Welt wie gehts?\n" +
-        "Luke Skywalker,ABC\n" +
-        "Comment#2,HIJ\n" +
-        "Comment#2,IJK"
-    val results = joinT.toDataSet[Row]collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.join(ds2).where('b === 'e).select('c, 'g)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
deleted file mode 100644
index 82668a1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
deleted file mode 100644
index 94361c6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SortITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  def getExecutionEnvironment = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(4)
-    env
-  }
-
-  val tupleDataSetStrings = List((1, 1L, "Hi")
-    ,(2, 2L, "Hello")
-    ,(3, 2L, "Hello world")
-    ,(4, 3L, "Hello world, how are you?")
-    ,(5, 3L, "I am fine.")
-    ,(6, 3L, "Luke Skywalker")
-    ,(7, 4L, "Comment#1")
-    ,(8, 4L, "Comment#2")
-    ,(9, 4L, "Comment#3")
-    ,(10, 4L, "Comment#4")
-    ,(11, 5L, "Comment#5")
-    ,(12, 5L, "Comment#6")
-    ,(13, 5L, "Comment#7")
-    ,(14, 5L, "Comment#8")
-    ,(15, 5L, "Comment#9")
-    ,(16, 6L, "Comment#10")
-    ,(17, 6L, "Comment#11")
-    ,(18, 6L, "Comment#12")
-    ,(19, 6L, "Comment#13")
-    ,(20, 6L, "Comment#14")
-    ,(21, 6L, "Comment#15"))
-
-  @Test
-  def testOrderByDesc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int])
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByAsc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int])
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByMultipleFieldsWithSql(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  def sortExpectedly(dataSet: List[Product])(implicit ordering: Ordering[Product]): String = {
-    dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
deleted file mode 100644
index 545721d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase
-
-import org.junit._
-import org.junit.Assert.assertEquals
-
-class SqlExplainTest
-  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
-
-  val testFilePath = SqlExplainTest.this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilterWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testFilterWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table, true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table, true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table, true)
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString
-    assertEquals(result, source)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
deleted file mode 100644
index 1ad57b4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.codegen.CodeGenException
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testSubstring(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).toTable(tEnv, 'a, 'b)
-      .select('a.substring(1, 'b))
-
-    val expected = "AA\nB"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSubstringWithMaxEnd(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).toTable(tEnv, 'a, 'b)
-      .select('a.substring('b))
-
-    val expected = "CD\nBCD"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[CodeGenException])
-  def testNonWorkingSubstring1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).toTable(tEnv, 'a, 'b)
-      // must fail, second argument of substring must be Integer not Double.
-      .select('a.substring(0, 'b))
-
-    t.toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[CodeGenException])
-  def testNonWorkingSubstring2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).toTable(tEnv, 'a, 'b)
-      // must fail, first argument of substring must be Integer not String.
-      .select('a.substring('b, 15))
-
-    t.toDataSet[Row].collect()
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
deleted file mode 100644
index bd1ce46..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableEnvironmentITCase.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, TableException, Row}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableEnvironmentITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds)
-    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRegisterWithFields(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
-    val t = tEnv.scan(tableName).select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
-      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingDataSet(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds1)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", ds2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testScanUnregisteredTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    // Must fail. No table registered under that name.
-    tEnv.scan("someTable")
-  }
-
-  @Test
-  def testTableRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable(tableName, t)
-
-    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
-
-    val expected = "9,4\n" + "10,4\n" +
-      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
-      "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = regT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", t1)
-    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", t2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterTableFromOtherEnv(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
-    // Must fail. Table is bound to different TableEnvironment.
-    tEnv2.registerTable("MyTable", t1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableSourceITCase.scala
deleted file mode 100644
index 8274d46..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/TableSourceITCase.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import java.io.{FileOutputStream, OutputStreamWriter, File}
-
-import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaExecEnv, DataSet => JavaSet}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.table.sources.{CsvTableSource, BatchTableSource}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableSourceITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testBatchTableSourceTableAPI(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv
-      .scan("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .toDataSet[Row].collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testBatchTableSourceSQL(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
-      .toDataSet[Row].collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFile = File.createTempFile("csv-test", "tmp")
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
-    tmpWriter.write(csvRecords.mkString("$"))
-    tmpWriter.close()
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val csvTable = new CsvTableSource(
-      tempFile.getAbsolutePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-
-    tEnv.registerTableSource("csvTable", csvTable)
-    val results = tEnv.sql(
-      "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last")
-      .toDataSet[Row].collect()
-
-    val expected = Seq(
-      "Smith,102.52,7",
-      "Taylor,45.6,2",
-      "Miller,14.67,6",
-      "Williams,36.84,8").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}
-
-class TestBatchTableSource extends BatchTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[DataSet]]. */
-  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
-    execEnv.createInput(new GeneratingInputFormat(33), getReturnType).setParallelism(1)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingInputFormat(val num: Int) extends GenericInputFormat[Row] {
-
-  var cnt = 0L
-
-  override def reachedEnd(): Boolean = cnt >= num
-
-  override def nextRecord(reuse: Row): Row = {
-    reuse.setField(0, s"Record_$cnt")
-    reuse.setField(1, cnt)
-    reuse.setField(2, (cnt % 16).toInt)
-    cnt += 1
-    reuse
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
deleted file mode 100644
index f162846..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ToTableITCase.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ToTableITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testToTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromAndToCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testToTableWithToFewFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testToTableWithToManyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testToTableWithAmbiguousFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .toTable(tEnv, 'a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testToTableWithNonFieldReference1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testToTableWithNonFieldReference2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a as 'foo, 'b, 'c)
-  }
-
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
deleted file mode 100644
index 0448386..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import scala.collection.JavaConverters._
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hallo\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testUnionDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    // must fail. Union inputs have different field names.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test
-  def testUnionWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "18"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv,'a, 'b, 'd, 'c, 'e)
-    val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).toTable(tEnv, 'a2, 'b2, 'd2, 'c2, 'e2)
-
-    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c))
-      .join(ds3.select('a2, 'b2, 'c2))
-      .where('a ==='a2).select('c, 'c2)
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
-      "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
-      "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
-      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
new file mode 100644
index 0000000..9ceb9d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.tuple
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.typeutils.RowComparatorTest.MyPojo
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Assert._
+
+class RowComparatorTest extends ComparatorTestBase[Row] {
+
+  val typeInfo = new RowTypeInfo(
+    Array(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.BOOLEAN_TYPE_INFO,
+        BasicTypeInfo.SHORT_TYPE_INFO),
+      TypeExtractor.createTypeInfo(classOf[MyPojo])))
+
+  val testPojo1 = new MyPojo()
+  // TODO we cannot test null here as PojoComparator has no support for null keys
+  testPojo1.name = ""
+  val testPojo2 = new MyPojo()
+  testPojo2.name = "Test1"
+  val testPojo3 = new MyPojo()
+  testPojo3.name = "Test2"
+
+  val data: Array[Row] = Array(
+    createRow(null, null, null, null, null),
+    createRow(0, null, null, null, null),
+    createRow(0, 0.0, null, null, null),
+    createRow(0, 0.0, "a", null, null),
+    createRow(1, 0.0, "a", null, null),
+    createRow(1, 1.0, "a", null, null),
+    createRow(1, 1.0, "b", null, null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
+    )
+
+  override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
+    val arity = should.productArity
+    assertEquals(message, arity, is.productArity)
+    var index = 0
+    while (index < arity) {
+      val copiedValue: Any = should.productElement(index)
+      val element: Any = is.productElement(index)
+      assertEquals(message, element, copiedValue)
+      index += 1
+    }
+  }
+
+  override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
+    typeInfo.createComparator(
+      Array(0, 1, 2, 3, 4, 5, 6),
+      Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending),
+      0,
+      new ExecutionConfig())
+  }
+
+  override protected def createSerializer(): TypeSerializer[Row] = {
+    typeInfo.createSerializer(new ExecutionConfig())
+  }
+
+  override protected def getSortedTestData: Array[Row] = {
+    data
+  }
+
+  override protected def supportsNullKeys: Boolean = true
+
+  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
+    val r: Row = new Row(5)
+    r.setField(0, f0)
+    r.setField(1, f1)
+    r.setField(2, f2)
+    r.setField(3, f3)
+    r.setField(4, f4)
+    r
+  }
+}
+
+object RowComparatorTest {
+
+  class MyPojo() extends Serializable with Comparable[MyPojo] {
+    // we cannot use null because the PojoComparator does not support null properly
+    var name: String = ""
+
+    override def compareTo(o: MyPojo): Int = {
+      if (name == null && o.name == null) {
+        0
+      }
+      else if (name == null) {
+        -1
+      }
+      else if (o.name == null) {
+        1
+      }
+      else {
+        name.compareTo(o.name)
+      }
+    }
+
+    override def equals(other: Any): Boolean = other match {
+      case that: MyPojo => compareTo(that) == 0
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
new file mode 100644
index 0000000..95a1bb5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
+import org.apache.flink.api.java.tuple
+import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo
+import org.junit.Assert._
+import org.junit.Test
+
+class RowSerializerTest {
+
+  class RowSerializerTestInstance(
+      serializer: TypeSerializer[Row],
+      testData: Array[Row])
+    extends SerializerTestInstance[Row](serializer, classOf[Row], -1, testData: _*) {
+
+    override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
+      val arity = should.productArity
+      assertEquals(message, arity, is.productArity)
+      var index = 0
+      while (index < arity) {
+        val copiedValue: Any = should.productElement(index)
+        val element: Any = is.productElement(index)
+        assertEquals(message, element, copiedValue)
+        index += 1
+      }
+    }
+  }
+
+  @Test
+  def testRowSerializer(): Unit = {
+    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
+      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
+
+    val row1 = new Row(2)
+    row1.setField(0, 1)
+    row1.setField(1, "a")
+
+    val row2 = new Row(2)
+    row2.setField(0, 2)
+    row2.setField(1, null)
+
+    val testData: Array[Row] = Array(row1, row2)
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  @Test
+  def testLargeRowSerializer(): Unit = {
+    val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+
+    val row = new Row(13)
+    row.setField(0, 2)
+    row.setField(1, null)
+    row.setField(3, null)
+    row.setField(4, null)
+    row.setField(5, null)
+    row.setField(6, null)
+    row.setField(7, null)
+    row.setField(8, null)
+    row.setField(9, null)
+    row.setField(10, null)
+    row.setField(11, null)
+    row.setField(12, "Test")
+
+    val testData: Array[Row] = Array(row)
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  @Test
+  def testRowSerializerWithComplexTypes(): Unit = {
+    val rowInfo = new RowTypeInfo(
+      Array(
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.DOUBLE_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO,
+        new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
+          BasicTypeInfo.INT_TYPE_INFO,
+          BasicTypeInfo.BOOLEAN_TYPE_INFO,
+          BasicTypeInfo.SHORT_TYPE_INFO),
+        TypeExtractor.createTypeInfo(classOf[MyPojo])))
+
+    val testPojo1 = new MyPojo()
+    testPojo1.name = null
+    val testPojo2 = new MyPojo()
+    testPojo2.name = "Test1"
+    val testPojo3 = new MyPojo()
+    testPojo3.name = "Test2"
+
+    val testData: Array[Row] = Array(
+      createRow(null, null, null, null, null),
+      createRow(0, null, null, null, null),
+      createRow(0, 0.0, null, null, null),
+      createRow(0, 0.0, "a", null, null),
+      createRow(1, 0.0, "a", null, null),
+      createRow(1, 1.0, "a", null, null),
+      createRow(1, 1.0, "b", null, null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
+    )
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
+    val r: Row = new Row(5)
+    r.setField(0, f0)
+    r.setField(1, f1)
+    r.setField(2, f2)
+    r.setField(3, f3)
+    r.setField(4, f4)
+    r
+  }
+}
+
+object RowSerializerTest {
+  class MyPojo() extends Serializable with Comparable[MyPojo] {
+    var name: String = null
+
+    override def compareTo(o: MyPojo): Int = {
+      if (name == null && o.name == null) {
+        0
+      }
+      else if (name == null) {
+        -1
+      }
+      else if (o.name == null) {
+        1
+      }
+      else {
+        name.compareTo(o.name)
+      }
+    }
+
+    override def equals(other: Any): Boolean = other match {
+      case that: MyPojo => compareTo(that) == 0
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
deleted file mode 100644
index 8f242e9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.test
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.{ExpressionParser, Expression}
-import org.apache.flink.api.table.test.utils.ExpressionEvaluator
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-class ScalarFunctionsTest {
-
-  // ----------------------------------------------------------------------------------------------
-  // String functions
-  // ----------------------------------------------------------------------------------------------
-
-  @Test
-  def testSubstring(): Unit = {
-    testFunction(
-      'f0.substring(2),
-      "f0.substring(2)",
-      "SUBSTRING(f0, 2)",
-      "his is a test String.")
-
-    testFunction(
-      'f0.substring(2, 5),
-      "f0.substring(2, 5)",
-      "SUBSTRING(f0, 2, 5)",
-      "his i")
-
-    testFunction(
-      'f0.substring(1, 'f7),
-      "f0.substring(1, f7)",
-      "SUBSTRING(f0, 1, f7)",
-      "Thi")
-  }
-
-  @Test
-  def testTrim(): Unit = {
-    testFunction(
-      'f8.trim(),
-      "f8.trim()",
-      "TRIM(f8)",
-      "This is a test String.")
-
-    testFunction(
-      'f8.trim(removeLeading = true, removeTrailing = true, " "),
-      "trim(f8)",
-      "TRIM(f8)",
-      "This is a test String.")
-
-    testFunction(
-      'f8.trim(removeLeading = false, removeTrailing = true, " "),
-      "f8.trim(TRAILING, ' ')",
-      "TRIM(TRAILING FROM f8)",
-      " This is a test String.")
-
-    testFunction(
-      'f0.trim(removeLeading = true, removeTrailing = true, "."),
-      "trim(BOTH, '.', f0)",
-      "TRIM(BOTH '.' FROM f0)",
-      "This is a test String")
-  }
-
-  @Test
-  def testCharLength(): Unit = {
-    testFunction(
-      'f0.charLength(),
-      "f0.charLength()",
-      "CHAR_LENGTH(f0)",
-      "22")
-
-    testFunction(
-      'f0.charLength(),
-      "charLength(f0)",
-      "CHARACTER_LENGTH(f0)",
-      "22")
-  }
-
-  @Test
-  def testUpperCase(): Unit = {
-    testFunction(
-      'f0.upperCase(),
-      "f0.upperCase()",
-      "UPPER(f0)",
-      "THIS IS A TEST STRING.")
-  }
-
-  @Test
-  def testLowerCase(): Unit = {
-    testFunction(
-      'f0.lowerCase(),
-      "f0.lowerCase()",
-      "LOWER(f0)",
-      "this is a test string.")
-  }
-
-  @Test
-  def testInitCap(): Unit = {
-    testFunction(
-      'f0.initCap(),
-      "f0.initCap()",
-      "INITCAP(f0)",
-      "This Is A Test String.")
-  }
-
-  @Test
-  def testConcat(): Unit = {
-    testFunction(
-      'f0 + 'f0,
-      "f0 + f0",
-      "f0||f0",
-      "This is a test String.This is a test String.")
-  }
-
-  @Test
-  def testLike(): Unit = {
-    testFunction(
-      'f0.like("Th_s%"),
-      "f0.like('Th_s%')",
-      "f0 LIKE 'Th_s%'",
-      "true")
-
-    testFunction(
-      'f0.like("%is a%"),
-      "f0.like('%is a%')",
-      "f0 LIKE '%is a%'",
-      "true")
-  }
-
-  @Test
-  def testNotLike(): Unit = {
-    testFunction(
-      !'f0.like("Th_s%"),
-      "!f0.like('Th_s%')",
-      "f0 NOT LIKE 'Th_s%'",
-      "false")
-
-    testFunction(
-      !'f0.like("%is a%"),
-      "!f0.like('%is a%')",
-      "f0 NOT LIKE '%is a%'",
-      "false")
-  }
-
-  @Test
-  def testSimilar(): Unit = {
-    testFunction(
-      'f0.similar("_*"),
-      "f0.similar('_*')",
-      "f0 SIMILAR TO '_*'",
-      "true")
-
-    testFunction(
-      'f0.similar("This (is)? a (test)+ Strin_*"),
-      "f0.similar('This (is)? a (test)+ Strin_*')",
-      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
-      "true")
-  }
-
-  @Test
-  def testNotSimilar(): Unit = {
-    testFunction(
-      !'f0.similar("_*"),
-      "!f0.similar('_*')",
-      "f0 NOT SIMILAR TO '_*'",
-      "false")
-
-    testFunction(
-      !'f0.similar("This (is)? a (test)+ Strin_*"),
-      "!f0.similar('This (is)? a (test)+ Strin_*')",
-      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
-      "false")
-  }
-
-  @Test
-  def testMod(): Unit = {
-    testFunction(
-      'f4.mod('f7),
-      "f4.mod(f7)",
-      "MOD(f4, f7)",
-      "2")
-
-    testFunction(
-      'f4.mod(3),
-      "mod(f4, 3)",
-      "MOD(f4, 3)",
-      "2")
-
-    testFunction(
-      'f4 % 3,
-      "mod(44, 3)",
-      "MOD(44, 3)",
-      "2")
-
-  }
-
-  @Test
-  def testExp(): Unit = {
-    testFunction(
-      'f2.exp(),
-      "f2.exp()",
-      "EXP(f2)",
-      math.exp(42.toByte).toString)
-
-    testFunction(
-      'f3.exp(),
-      "f3.exp()",
-      "EXP(f3)",
-      math.exp(43.toShort).toString)
-
-    testFunction(
-      'f4.exp(),
-      "f4.exp()",
-      "EXP(f4)",
-      math.exp(44.toLong).toString)
-
-    testFunction(
-      'f5.exp(),
-      "f5.exp()",
-      "EXP(f5)",
-      math.exp(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.exp(),
-      "f6.exp()",
-      "EXP(f6)",
-      math.exp(4.6).toString)
-
-    testFunction(
-      'f7.exp(),
-      "exp(3)",
-      "EXP(3)",
-      math.exp(3).toString)
-  }
-
-  @Test
-  def testLog10(): Unit = {
-    testFunction(
-      'f2.log10(),
-      "f2.log10()",
-      "LOG10(f2)",
-      math.log10(42.toByte).toString)
-
-    testFunction(
-      'f3.log10(),
-      "f3.log10()",
-      "LOG10(f3)",
-      math.log10(43.toShort).toString)
-
-    testFunction(
-      'f4.log10(),
-      "f4.log10()",
-      "LOG10(f4)",
-      math.log10(44.toLong).toString)
-
-    testFunction(
-      'f5.log10(),
-      "f5.log10()",
-      "LOG10(f5)",
-      math.log10(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.log10(),
-      "f6.log10()",
-      "LOG10(f6)",
-      math.log10(4.6).toString)
-  }
-
-  @Test
-  def testPower(): Unit = {
-    testFunction(
-      'f2.power('f7),
-      "f2.power(f7)",
-      "POWER(f2, f7)",
-      math.pow(42.toByte, 3).toString)
-
-    testFunction(
-      'f3.power('f6),
-      "f3.power(f6)",
-      "POWER(f3, f6)",
-      math.pow(43.toShort, 4.6D).toString)
-
-    testFunction(
-      'f4.power('f5),
-      "f4.power(f5)",
-      "POWER(f4, f5)",
-      math.pow(44.toLong, 4.5.toFloat).toString)
-  }
-
-  @Test
-  def testLn(): Unit = {
-    testFunction(
-      'f2.ln(),
-      "f2.ln()",
-      "LN(f2)",
-      math.log(42.toByte).toString)
-
-    testFunction(
-      'f3.ln(),
-      "f3.ln()",
-      "LN(f3)",
-      math.log(43.toShort).toString)
-
-    testFunction(
-      'f4.ln(),
-      "f4.ln()",
-      "LN(f4)",
-      math.log(44.toLong).toString)
-
-    testFunction(
-      'f5.ln(),
-      "f5.ln()",
-      "LN(f5)",
-      math.log(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.ln(),
-      "f6.ln()",
-      "LN(f6)",
-      math.log(4.6).toString)
-  }
-
-  @Test
-  def testAbs(): Unit = {
-    testFunction(
-      'f2.abs(),
-      "f2.abs()",
-      "ABS(f2)",
-      "42")
-
-    testFunction(
-      'f3.abs(),
-      "f3.abs()",
-      "ABS(f3)",
-      "43")
-
-    testFunction(
-      'f4.abs(),
-      "f4.abs()",
-      "ABS(f4)",
-      "44")
-
-    testFunction(
-      'f5.abs(),
-      "f5.abs()",
-      "ABS(f5)",
-      "4.5")
-
-    testFunction(
-      'f6.abs(),
-      "f6.abs()",
-      "ABS(f6)",
-      "4.6")
-
-    testFunction(
-      'f9.abs(),
-      "f9.abs()",
-      "ABS(f9)",
-      "42")
-
-    testFunction(
-      'f10.abs(),
-      "f10.abs()",
-      "ABS(f10)",
-      "43")
-
-    testFunction(
-      'f11.abs(),
-      "f11.abs()",
-      "ABS(f11)",
-      "44")
-
-    testFunction(
-      'f12.abs(),
-      "f12.abs()",
-      "ABS(f12)",
-      "4.5")
-
-    testFunction(
-      'f13.abs(),
-      "f13.abs()",
-      "ABS(f13)",
-      "4.6")
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  def testFunction(
-      expr: Expression,
-      exprString: String,
-      sqlExpr: String,
-      expected: String): Unit = {
-    val testData = new Row(15)
-    testData.setField(0, "This is a test String.")
-    testData.setField(1, true)
-    testData.setField(2, 42.toByte)
-    testData.setField(3, 43.toShort)
-    testData.setField(4, 44.toLong)
-    testData.setField(5, 4.5.toFloat)
-    testData.setField(6, 4.6)
-    testData.setField(7, 3)
-    testData.setField(8, " This is a test String. ")
-    testData.setField(9, -42.toByte)
-    testData.setField(10, -43.toShort)
-    testData.setField(11, -44.toLong)
-    testData.setField(12, -4.5.toFloat)
-    testData.setField(13, -4.6)
-    testData.setField(14, -3)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      STRING_TYPE_INFO,
-      BOOLEAN_TYPE_INFO,
-      BYTE_TYPE_INFO,
-      SHORT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      FLOAT_TYPE_INFO,
-      DOUBLE_TYPE_INFO,
-      INT_TYPE_INFO,
-      STRING_TYPE_INFO,
-      BYTE_TYPE_INFO,
-      SHORT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      FLOAT_TYPE_INFO,
-      DOUBLE_TYPE_INFO,
-      INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
-
-    val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
-    assertEquals(expected, exprResult)
-
-    val exprStringResult = ExpressionEvaluator.evaluate(
-      testData,
-      typeInfo,
-      ExpressionParser.parseExpression(exprString))
-    assertEquals(expected, exprStringResult)
-
-    val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, sqlExpr)
-    assertEquals(expected, exprSqlResult)
-  }
-
-
-
-}


Mime
View raw message