flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [24/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:33 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/AggregationsITCase.scala
deleted file mode 100644
index 408db25..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testAggregationTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
-
-    val results = t.toDataSet[Row].collect()
-    val expected = "231,231,1,21,21,11"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
-      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-
-    val expected = "1,1,1,1,1.5,1.5,2"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short),
-      (2: Byte, 2: Short)).toTable(tEnv)
-      .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
-
-    val expected = "1,3,2,1,3"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
-      .select(('_1 + 2).avg + 2, '_2.count + 5)
-
-    val expected = "5.5,7"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithTwoCount(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
-      .select('_1.count, '_2.count)
-
-    val expected = "2,2"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationAfterProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
-      .select('_1, '_2, '_3)
-      .select('_1.avg, '_2.sum, '_3.count)
-
-    val expected = "1,3,2"
-    val result = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testSQLStyleAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select(
-        """Sum( a) as a1, a.sum as a2,
-          |Min (a) as b1, a.min as b2,
-          |Max (a ) as c1, a.max as c2,
-          |Avg ( a ) as d1, a.avg as d2,
-          |Count(a) as e1, a.count as e2
-        """.stripMargin)
-
-    val expected = "231,231,1,1,21,21,11,11,21,21"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testPojoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val input = env.fromElements(
-      WC("hello", 1),
-      WC("hello", 1),
-      WC("ciao", 1),
-      WC("hola", 1),
-      WC("hola", 1))
-    val expr = input.toTable(tEnv)
-    val result = expr
-      .groupBy('word)
-      .select('word, 'frequency.sum as 'frequency)
-      .filter('frequency === 2)
-      .toDataSet[WC]
-
-    val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect()
-    val expected = "(hello,20)\n" + "(hola,20)"
-    TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
-  }
-
-  @Test
-  def testDistinct(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val distinct = ds.select('b).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-    val distinct = ds.groupBy('a, 'e).select('e).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    val countFun = new CountAggFunction
-    val wAvgFun = new WeightedAvgWithMergeAndReset
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum, countFun('c), wAvgFun('b, 'a), wAvgFun('a, 'a))
-
-    val expected = "1,1,1,1,1\n" + "2,5,2,2,2\n" + "3,15,3,3,5\n" + "4,34,4,4,8\n" +
-      "5,65,5,5,13\n" + "6,111,6,6,18\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithLongKeys(): Unit = {
-    // This uses very long keys to force serialized comparison.
-    // With short keys, the normalized key is sufficient.
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = env.fromElements(
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      .select('c.sum)
-
-    val expected = "10\n" + "8\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
-      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
-      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('b, 4 as 'four, 'a)
-      .groupBy('b, 'four)
-      .select('four, 'a.sum)
-
-    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithExpression(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .groupBy('e, 'b % 3)
-      .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-      "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-
-    val expected = "2,5\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAnalyticAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, BigDecimal.ONE),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, new BigDecimal(2))).toTable(tEnv)
-    val res = ds.select(
-      '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop, '_5.stddevPop,
-      '_6.stddevPop, '_7.stddevPop,
-      '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp, '_5.stddevSamp,
-      '_6.stddevSamp, '_7.stddevSamp,
-      '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop, '_5.varPop,
-      '_6.varPop, '_7.varPop,
-      '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp, '_5.varSamp,
-      '_6.varSamp, '_7.varSamp)
-    val expected =
-        "0,0,0," +
-        "0,0.5,0.5,0.5," +
-        "1,1,1," +
-        "1,0.70710677,0.7071067811865476,0.7071067811865476," +
-        "0,0,0," +
-        "0,0.25,0.25,0.25," +
-        "1,1,1," +
-        "1,0.5,0.5,0.5"
-    val results = res.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-case class WC(word: String, frequency: Long)

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CalcITCase.scala
deleted file mode 100644
index 72e74c4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CalcITCase.scala
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CalcITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectStar(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    val expected = "\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val filterDs = ds.filter( 'c.like("%world%") )
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
-      "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
-      "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
-      "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a < 2 || 'a > 20)
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testConsecutiveFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterBasicType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
-      .filter( 's.like("%a%") )
-
-    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleCalc(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7)
-        .select('_1, '_3)
-
-    val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
-      "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
-      val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithTwoFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7 && '_2 === 3)
-        .select('_1, '_3)
-        .where('_1 === 4)
-        .select('_1)
-
-    val expected = "4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 15)
-        .groupBy('_2)
-        .select('_1.min, '_2.count as 'cnt)
-        .where('cnt > 3)
-
-    val expected = "7,4\n" + "11,4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
-      .where('b > 1).select('a, 'd).where('d === 2)
-
-    val expected = "2,2\n" + "3,2\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env
-      .fromElements((
-        BigDecimal("78.454654654654654").bigDecimal,
-        BigDecimal("4E+9999").bigDecimal,
-        Date.valueOf("1984-07-12"),
-        Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24")))
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
-        Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24"))
-
-    val expected = "78.454654654654654,4E+9999,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
-      "11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedScalarFunction() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerFunction("hashCode", OldHashCode)
-    tableEnv.registerFunction("hashCode", HashCode)
-    val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
-    val result = table.select("text.hashCode()")
-    val results = result.toDataSet[Row].collect()
-    val expected = "97\n98\n99"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-object CalcITCase {
-
-  @Parameterized.Parameters(name = "Table config = {0}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TableProgramsTestBase.DEFAULT),
-      Array(TableProgramsTestBase.NO_NULL)).asJava
-  }
-}
-
-object HashCode extends ScalarFunction {
-  def eval(s: String): Int = s.hashCode
-}
-
-object OldHashCode extends ScalarFunction {
-  def eval(s: String): Int = -1
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CastingITCase.scala
deleted file mode 100644
index dbc233b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/CastingITCase.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.Types._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testNumericAutocastInArithmetic() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
-        '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testNumericAutocastInComparison() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
-      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
-      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
-
-    val results = table.toDataSet[Row].collect()
-    val expected: String = "2,2,2,2,2.0,2.0"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCasting() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
-      .select(
-        // * -> String
-      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
-        // NUMERIC TYPE -> Boolean
-      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
-        // NUMERIC TYPE -> NUMERIC TYPE
-      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
-        // Boolean -> NUMERIC TYPE
-      '_4.cast(DOUBLE),
-        // identity casting
-      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "1,0.0,1,true," + "true,false,true," +
-      "1.0,0,1," + "1.0," + "1,0.0,1,true\n"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCastFromString() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
-      .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
-        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "1,1,1,1,2.0,2.0,true\n"
-    compareResultAsText(results.asJava, expected)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetCalcITCase.scala
deleted file mode 100644
index 2350e8a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetCalcITCase.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2, RichFunc3}
-import org.apache.flink.table.utils._
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DataSetCalcITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsClusterTestBase(mode, configMode) {
-
-  @Test
-  def testUserDefinedScalarFunctionWithParameter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerFunction("RichFunc2", new RichFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "ABC"))
-
-    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'"
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hello"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedScalarFunctionWithDistributedCache(): Unit = {
-    val words = "Hello\nWord"
-    val filePath = UserDefinedFunctionTestUtils.writeCacheFile("test_words", words)
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.registerCachedFile(filePath, "words")
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerFunction("RichFunc3", new RichFunc3)
-
-    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true"
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hello"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMultipleUserDefinedScalarFunctions(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerFunction("RichFunc1", new RichFunc1)
-    tEnv.registerFunction("RichFunc2", new RichFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "Abc"))
-
-    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT c FROM t1 where " +
-      "RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2"
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "Hello\nHello world"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetUserDefinedFunctionITCase.scala
deleted file mode 100644
index 611f521..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetUserDefinedFunctionITCase.scala
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.dataset.table
-
-import java.sql.{Date, Timestamp}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedTableFunctions.JavaTableFunc0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.expressions.utils.{Func1, Func13, Func18, RichFunc2}
-import org.apache.flink.table.utils._
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class DataSetUserDefinedFunctionITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-    extends TableProgramsClusterTestBase(mode, configMode) {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val func1 = new TableFunc1
-    val result = in.join(func1('c) as 's).select('c, 's).toDataSet[Row]
-    val results = result.collect()
-    val expected = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
-      "Anna#44,Anna\n" + "Anna#44,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-
-    // with overloading
-    val result2 = in.join(func1('c, "$") as 's).select('c, 's).toDataSet[Row]
-    val results2 = result2.collect()
-    val expected2 = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
-      "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
-    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val func2 = new TableFunc2
-    val result = in.leftOuterJoin(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
-    val results = result.collect()
-    val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
-      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testWithFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = in
-      .join(func0('c) as ('name, 'age))
-      .select('c, 'name, 'age)
-      .filter('age > 20)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCustomReturnType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func2 = new TableFunc2
-
-    val result = in
-      .join(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
-      "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val hierarchy = new HierarchyTableFunction
-    val result = in
-      .join(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'adult, 'len)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,Jack,true,22\n" + "John#19,John,false,19\n" +
-      "Anna#44,Anna,true,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val pojo = new PojoTableFunc()
-    val result = in
-      .join(pojo('c))
-      .where('age > 20)
-      .select('c, 'name, 'age)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithScalarFunction(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func1 = new TableFunc1
-
-    val result = in
-      .join(func1('c.substring(2)) as 's)
-      .select('c, 's)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,ack\n" + "Jack#22,22\n" + "John#19,ohn\n" + "John#19,19\n" +
-      "Anna#44,nna\n" + "Anna#44,44\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithScalarFunctionInCondition(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = in
-      .join(func0('c))
-      .where(Func18('name, "J") && (Func1('a) < 3) && Func1('age) > 20)
-      .select('c, 'name, 'age)
-      .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "Jack#22,Jack,22"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLongAndTemporalTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func0 = new JavaTableFunc0
-
-    val result = in
-        .where('a === 1)
-        .select(Date.valueOf("1990-10-14") as 'x,
-                1000L as 'y,
-                Timestamp.valueOf("1990-10-14 12:10:10") as 'z)
-        .join(func0('x, 'y, 'z) as 's)
-        .select('s)
-        .toDataSet[Row]
-
-    val results = result.collect()
-    val expected = "1000\n" + "655906210000\n" + "7591\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithParameter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val richTableFunc1 = new RichTableFunc1
-    tEnv.registerFunction("RichTableFunc1", richTableFunc1)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> "#"))
-
-    val result = testData(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .join(richTableFunc1('c) as 's)
-      .select('a, 's)
-
-    val expected = "1,Jack\n" + "1,22\n" + "2,John\n" + "2,19\n" + "3,Anna\n" + "3,44"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedTableFunctionWithScalarFunctionWithParameters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val richTableFunc1 = new RichTableFunc1
-    tEnv.registerFunction("RichTableFunc1", richTableFunc1)
-    val richFunc2 = new RichFunc2
-    tEnv.registerFunction("RichFunc2", richFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(
-      env,
-      Map("word_separator" -> "#", "string.value" -> "test"))
-
-    val result = CollectionDataSets.getSmall3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .join(richTableFunc1(richFunc2('c)) as 's)
-      .select('a, 's)
-
-    val expected = "1,Hi\n1,test\n2,Hello\n2,test\n3,Hello world\n3,test"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableFunctionConstructorWithParams(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-    val func30 = new TableFunc3(null)
-    val func31 = new TableFunc3("OneConf_")
-    val func32 = new TableFunc3("TwoConf_")
-
-    val result = in
-      .join(func30('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .join(func31('c) as ('f, 'g))
-      .select('c, 'd, 'e, 'f, 'g)
-      .join(func32('c) as ('h, 'i))
-      .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
-      .toDataSet[Row]
-
-    val results = result.collect()
-
-    val expected = "Anna#44,Anna,OneConf_Anna,TwoConf_Anna,44,44,44\n" +
-      "Jack#22,Jack,OneConf_Jack,TwoConf_Jack,22,22,22\n" +
-      "John#19,John,OneConf_John,TwoConf_John,19,19,19\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testScalarFunctionConstructorWithParams(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
-
-    val func0 = new Func13("default")
-    val func1 = new Func13("Sunny")
-    val func2 = new Func13("kevin2")
-
-    val result = in.select(func0('c), func1('c),func2('c))
-
-    val results = result.collect()
-
-    val expected = "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44\n" +
-      "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22\n" +
-      "default-John#19,Sunny-John#19,kevin2-John#19\n" +
-      "default-nosharp,Sunny-nosharp,kevin2-nosharp"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableFunctionWithVariableArguments(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    val varArgsFunc0 = new VarArgsFunc0
-    tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
-
-    val result = testData(env)
-      .toTable(tableEnv, 'a, 'b, 'c)
-      .select('c)
-      .join(varArgsFunc0("1", "2", 'c))
-
-    val expected = "Anna#44,1\n" +
-      "Anna#44,2\n" +
-      "Anna#44,Anna#44\n" +
-      "Jack#22,1\n" +
-      "Jack#22,2\n" +
-      "Jack#22,Jack#22\n" +
-      "John#19,1\n" +
-      "John#19,2\n" +
-      "John#19,John#19\n" +
-      "nosharp,1\n" +
-      "nosharp,2\n" +
-      "nosharp,nosharp"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-
-    // Test for empty cases
-    val result0 = testData(env)
-      .toTable(tableEnv, 'a, 'b, 'c)
-      .select('c)
-      .join(varArgsFunc0())
-    val results0 = result0.toDataSet[Row].collect()
-    assertTrue(results0.isEmpty)
-  }
-
-  private def testData(
-      env: ExecutionEnvironment)
-    : DataSet[(Int, Long, String)] = {
-
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Jack#22"))
-    data.+=((2, 2L, "John#19"))
-    data.+=((3, 2L, "Anna#44"))
-    data.+=((4, 3L, "nosharp"))
-    env.fromCollection(data)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetWindowAggregateITCase.scala
deleted file mode 100644
index e28acb8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/DataSetWindowAggregateITCase.scala
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DataSetWindowAggregateITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-    extends TableProgramsClusterTestBase(mode, configMode) {
-
-  val data = List(
-    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
-    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
-    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
-    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
-    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
-    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
-    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"))
-
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testAllEventTimeTumblingWindowOverCount(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    // Count tumbling non-grouping window on event-time are currently not supported
-    table
-      .window(Tumble over 2.rows on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-      .toDataSet[Row]
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.sum, 'int.count, 'int.max, 'int.min, 'int.avg,
-              'double.sum, 'double.count, 'double.max, 'double.min, 'double.avg,
-              'float.sum, 'float.count, 'float.max, 'float.min, 'float.avg,
-              'bigdec.sum, 'bigdec.count, 'bigdec.max, 'bigdec.min, 'bigdec.avg)
-
-    val expected = "Hello,7,2,5,2,3,7.0,2,5.0,2.0,3.5,7.0,2,5.0,2.0,3.5,7,2,5,2,3.5\n" +
-      "Hello world,7,2,4,3,3,7.0,2,4.0,3.0,3.5,7.0,2,4.0,3.0,3.5,7,2,4,3,3.5\n"
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.sum, 'w.start, 'w.end)
-
-    val expected = "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" +
-      "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingWindowOverTime(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.sum, 'w.start, 'w.end)
-
-    val expected = "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" +
-      "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindow(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'long as 'w)
-      .groupBy('string, 'w)
-      .select('string, 'string.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toDataSet[Row].collect()
-
-    val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" +
-      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" +
-      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" +
-      "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" +
-      "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllEventTimeSessionGroupWindow(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val results =table
-      .window(Session withGap 2.milli on 'long as 'w)
-      .groupBy('w)
-      .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect()
-
-    val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" +
-      "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" +
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMultiGroupWindow(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-      .window( Slide over 5.milli every 1.milli on 'int as 'w2)
-      .groupBy('w2)
-      .select('string)
-      .toDataSet[Row]
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Sliding windows
-  // ----------------------------------------------------------------------------------------------
-
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    // Count sliding group window on event-time are currently not supported
-    table
-      .window(Slide over 2.rows every 2.rows on 'long as 'w)
-      .groupBy('w)
-      .select('int.count)
-      .toDataSet[Row]
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    // please keep this test in sync with the DataStream variant
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 2.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count, 'w.start, 'w.end)
-
-    val expected =
-      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
-      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
-      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
-      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
-      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
-      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" +
-      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
-      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
-    // please keep this test in sync with the DataStream variant
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'long as 'w)
-      .groupBy('string, 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected =
-      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
-      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
-      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
-      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
-      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
-      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
-      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
-    // please keep this test in sync with the DataStream variant
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 4.milli on 'long as 'w)
-      .groupBy('string, 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected =
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
-      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
-      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
-      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
-    // please keep this test in sync with the DataStream variant
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 10.milli on 'long as 'w)
-      .groupBy('string, 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected =
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
-    // please keep this test in sync with the DataStream variant
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env
-      .fromCollection(data)
-      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 3.milli every 10.milli on 'long as 'w)
-      .groupBy('string, 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected =
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" +
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003"
-
-    val results = windowedTable.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/JoinITCase.scala
deleted file mode 100644
index 9743bf6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/JoinITCase.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableFunc2
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    val expected = "Hi,Hallo\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithJoinFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
-      "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-
-    val results = joinT.toDataSet[Row].collect()
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-    "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    // use different table env in order to let tmp table ids are the same
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    val expected = "6"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithGroupedAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2)
-      .where('a === 'd)
-      .groupBy('a, 'd)
-      .select('b.sum, 'g.count)
-
-    val expected = "6,3\n" + "4,2\n" + "1,1"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinPushThroughJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
-
-    val joinT = ds1.join(ds2)
-      .where(Literal(true))
-      .join(ds3)
-      .where('a === 'd && 'e === 'k)
-      .select('a, 'f, 'l)
-
-    val expected = "2,1,Hello\n" + "2,1,Hello world\n" + "1,0,Hi"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithDisjunctivePred(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" +
-      "Hello,Hallo Welt\n" +
-      "I am fine.,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithExpressionPreds(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-
-    val expected = "I am fine.,Hallo Welt\n" +
-      "Luke Skywalker,Hallo Welt wie gehts?\n" +
-      "Luke Skywalker,ABC\n" +
-      "Comment#2,HIJ\n" +
-      "Comment#2,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLeftJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
-      "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
-      "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
-      "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
-      "Comment#15,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRightJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFullOuterJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" + "Luke Skywalker,null\n" +
-      "Comment#1,null\n" + "Comment#2,null\n" + "Comment#3,null\n" + "Comment#4,null\n" +
-      "Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
-      "Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
-      "Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
-      "Hello world, how are you?,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUDTFJoinOnTuples(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List("hi#world", "how#are#you")
-
-    val ds1 = env.fromCollection(data).toTable(tEnv, 'a)
-    val func2 = new TableFunc2
-
-    val joinDs = ds1.join(func2('a) as ('name, 'len))
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = Seq(
-      "hi#world,hi,2",
-      "hi#world,world,5",
-      "how#are#you,how,3",
-      "how#are#you,are,3",
-      "how#are#you,you,3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}


Mime
View raw message