flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [17/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test
Date Thu, 13 Jul 2017 10:18:26 GMT
[FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

This closes #3943.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecde7bc1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecde7bc1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecde7bc1

Branch: refs/heads/master
Commit: ecde7bc13c992e81a50d4f9b897ba4840709629c
Parents: 9bfc927
Author: sunjincheng121 <sunjincheng121@gmail.com>
Authored: Thu May 18 13:02:24 2017 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Thu Jul 13 12:17:41 2017 +0200

----------------------------------------------------------------------
 .../table/api/java/stream/sql/SqlITCase.java    |   4 +-
 .../apache/flink/table/AggregationTest.scala    | 263 -----
 .../flink/table/CalciteConfigBuilderTest.scala  | 403 --------
 .../flink/table/CompositeFlatteningTest.scala   | 147 ---
 .../flink/table/ExpressionReductionTest.scala   |   8 +-
 .../flink/table/ExternalCatalogTest.scala       | 194 ----
 .../flink/table/TableEnvironmentTest.scala      | 394 --------
 .../apache/flink/table/TableSchemaTest.scala    |  82 --
 .../apache/flink/table/TableSourceTest.scala    | 421 --------
 .../table/api/scala/batch/ExplainTest.scala     | 123 ---
 .../scala/batch/TableEnvironmentITCase.scala    | 291 ------
 .../api/scala/batch/TableEnvironmentTest.scala  | 318 ++++++
 .../table/api/scala/batch/TableSchemaTest.scala |  73 ++
 .../table/api/scala/batch/TableSinkITCase.scala |  69 --
 .../api/scala/batch/TableSourceITCase.scala     | 121 ---
 .../scala/batch/sql/AggregationsITCase.scala    | 394 --------
 .../api/scala/batch/sql/AggregationsTest.scala  | 141 ++-
 .../table/api/scala/batch/sql/CalcITCase.scala  | 327 -------
 .../batch/sql/CompositeFlatteningTest.scala     |  52 +
 .../batch/sql/ExpressionReductionTest.scala     | 136 +++
 .../table/api/scala/batch/sql/JoinITCase.scala  | 700 --------------
 .../batch/sql/QueryDecorrelationTest.scala      |  18 +-
 .../scala/batch/sql/SetOperatorsITCase.scala    | 264 -----
 .../table/api/scala/batch/sql/SortITCase.scala  | 199 ----
 .../scala/batch/sql/TableWithSQLITCase.scala    | 116 ---
 .../scala/batch/sql/WindowAggregateTest.scala   |  82 +-
 .../validation/AggregationsValidationTest.scala |  43 +
 .../sql/validation/CalcValidationTest.scala     |  38 +
 .../sql/validation/JoinValidationTest.scala     | 149 +++
 .../sql/validation/SortValidationTest.scala     |  38 +
 .../WindowAggregateValidationTest.scala         |  96 ++
 .../scala/batch/table/AggregationsITCase.scala  | 379 --------
 .../scala/batch/table/AggregationsTest.scala    | 132 +++
 .../api/scala/batch/table/CalcITCase.scala      | 381 --------
 .../api/scala/batch/table/CastingITCase.scala   | 110 ---
 .../batch/table/CompositeFlatteningTest.scala   | 116 +++
 .../api/scala/batch/table/ExplainTest.scala     | 123 +++
 .../batch/table/ExpressionReductionTest.scala   | 119 +++
 .../scala/batch/table/FieldProjectionTest.scala |  79 +-
 .../api/scala/batch/table/GroupWindowTest.scala | 151 +--
 .../api/scala/batch/table/JoinITCase.scala      | 371 -------
 .../scala/batch/table/SetOperatorsITCase.scala  | 228 -----
 .../api/scala/batch/table/SortITCase.scala      | 226 -----
 .../table/UserDefinedTableFunctionTest.scala    |  86 +-
 .../AggregationsStringExpressionTest.scala      | 136 +--
 .../stringexpr/CalcStringExpressionTest.scala   | 131 +--
 .../CastingStringExpressionTest.scala           |   4 +-
 .../stringexpr/JoinStringExpressionTest.scala   | 107 +--
 ...finedTableFunctionStringExpressionTest.scala |  90 ++
 .../validation/AggregationsValidationTest.scala | 187 ++--
 .../table/validation/CalcValidationTest.scala   |  68 +-
 .../CompositeFlatteningValidationTest.scala     |  36 +
 .../validation/GroupWindowValidationTest.scala  | 185 ++++
 .../table/validation/JoinValidationTest.scala   | 178 ++--
 .../validation/SetOperatorsValidationTest.scala |  35 +-
 .../table/validation/SortValidationTest.scala   |  23 +-
 .../utils/TableProgramsClusterTestBase.scala    |   3 +-
 .../TableEnvironmentValidationTest.scala        | 244 +++++
 .../api/scala/stream/ExplainStreamTest.scala    |  74 --
 .../api/scala/stream/RetractionITCase.scala     | 194 ----
 .../stream/StreamTableEnvironmentTest.scala     | 171 ----
 .../api/scala/stream/TableEnvironmentTest.scala | 120 +++
 .../api/scala/stream/TableSchemaTest.scala      |  50 +
 .../api/scala/stream/TableSinkITCase.scala      |  63 --
 .../api/scala/stream/TableSourceITCase.scala    | 105 --
 .../api/scala/stream/TableSourceTest.scala      | 176 ----
 .../api/scala/stream/sql/AggregationsTest.scala | 141 ---
 .../stream/sql/ExpressionReductionTest.scala    | 136 +++
 .../api/scala/stream/sql/OverWindowITCase.scala | 864 -----------------
 .../table/api/scala/stream/sql/SqlITCase.scala  | 319 ------
 .../scala/stream/sql/WindowAggregateTest.scala  |  67 +-
 .../validation/AggregationsValidationTest.scala | 141 +++
 .../validation/OverWindowValidationTest.scala   |  45 +
 .../WindowAggregateValidationTest.scala         |  90 ++
 .../api/scala/stream/table/CalcITCase.scala     | 270 ------
 .../api/scala/stream/table/ExplainTest.scala    |  74 ++
 .../stream/table/ExpressionReductionTest.scala  | 119 +++
 .../stream/table/FieldProjectionTest.scala      |  98 ++
 .../stream/table/GroupAggregationsITCase.scala  | 157 ---
 .../stream/table/GroupAggregationsTest.scala    |  75 +-
 .../table/GroupWindowAggregationsITCase.scala   |   2 +-
 .../table/GroupWindowAggregationsTest.scala     | 785 +++++++++++++++
 .../scala/stream/table/GroupWindowTest.scala    | 963 -------------------
 .../scala/stream/table/OverWindowITCase.scala   | 347 -------
 .../api/scala/stream/table/OverWindowTest.scala |  93 +-
 .../scala/stream/table/TableSourceTest.scala    | 156 +++
 .../api/scala/stream/table/UnionITCase.scala    | 124 ---
 .../scala/stream/table/UnsupportedOpsTest.scala |  98 --
 .../table/UserDefinedTableFunctionTest.scala    | 264 -----
 .../stringexpr/CalcStringExpressionTest.scala   | 126 +++
 .../GroupAggregationsStringExpressionTest.scala |  63 ++
 .../GroupWindowStringExpressionTest.scala       | 199 +++-
 .../OverWindowStringExpressionTest.scala        |  59 +-
 .../stringexpr/UnionStringExpressionTest.scala  |  48 +
 ...finedTableFunctionStringExpressionTest.scala |  91 ++
 .../table/validation/CalcValidationTest.scala   |  84 ++
 .../GroupAggregationsValidationTest.scala       |  63 ++
 .../GroupWindowAggregationsValidationTest.scala | 207 ++++
 .../validation/OverWindowValidationTest.scala   | 117 +++
 .../validation/TableSinksValidationTest.scala   |  64 ++
 .../validation/TableSourceValidationTest.scala  |  50 +
 .../TimeAttributesValidationTest.scala          |  79 ++
 .../table/validation/UnionValidationTest.scala  |  84 ++
 .../UnsupportedOpsValidationTest.scala          |  98 ++
 ...UserDefinedTableFunctionValidationTest.scala | 199 ++++
 .../api/scala/stream/utils/StreamITCase.scala   |  83 --
 .../utils/StreamingWithStateTestBase.scala      |  40 -
 .../TableEnvironmentValidationTest.scala        |  78 ++
 .../validation/TableSchemaValidationTest.scala  |  35 +
 .../calcite/CalciteConfigBuilderTest.scala      | 402 ++++++++
 .../calcite/RelTimeIndicatorConverterTest.scala | 398 --------
 .../sql/RelTimeIndicatorConverterTest.scala     | 146 +++
 .../table/RelTimeIndicatorConverterTest.scala   | 286 ++++++
 .../catalog/batch/sql/ExternalCatalogTest.scala |  56 ++
 .../batch/table/ExternalCatalogTest.scala       |  94 ++
 .../stream/sql/ExternalCatalogTest.scala        |  57 ++
 .../stream/table/ExternalCatalogTest.scala      |  65 ++
 .../catalog/utils/ExternalCatalogTestBase.scala |  38 +
 .../flink/table/expressions/ArrayTypeTest.scala | 112 +--
 .../table/expressions/CompositeAccessTest.scala |  91 +-
 .../flink/table/expressions/MapTypeTest.scala   |  41 +-
 .../table/expressions/ScalarFunctionsTest.scala | 103 +-
 .../table/expressions/ScalarOperatorsTest.scala |  72 +-
 .../batch/sql/ExpressionReductionTest.scala     | 154 +++
 .../batch/table/ExpressionReductionTest.scala   | 119 +++
 .../stream/sql/ExpressionReductionTest.scala    | 154 +++
 .../stream/table/ExpressionReductionTest.scala  | 137 +++
 .../expressions/utils/ArrayTypeTestBase.scala   |  65 ++
 .../utils/CompositeAccessTestBase.scala         |  79 ++
 .../expressions/utils/MapTypeTestBase.scala     |  53 +
 .../utils/ScalarFunctionsTestBase.scala         | 105 ++
 .../utils/ScalarOperatorsTestBase.scala         |  69 ++
 .../validation/ArrayTypeValidationTest.scala    |  91 ++
 .../CompositeAccessValidationTest.scala         |  60 ++
 .../validation/MapTypeValidationTest.scala      |  31 +
 .../ScalarFunctionsValidationTest.scala         |  44 +
 .../ScalarOperatorsValidationTest.scala         |  48 +
 .../plan/rules/NormalizationRulesTest.scala     |   4 +-
 .../table/plan/rules/RetractionRulesTest.scala  |   4 +-
 .../runtime/dataset/DataSetCalcITCase.scala     | 105 --
 .../DataSetUserDefinedFunctionITCase.scala      | 346 -------
 .../dataset/DataSetWindowAggregateITCase.scala  | 357 -------
 .../dataset/sql/AggregationsITCase.scala        | 394 ++++++++
 .../table/runtime/dataset/sql/CalcITCase.scala  | 323 +++++++
 .../table/runtime/dataset/sql/JoinITCase.scala  | 521 ++++++++++
 .../dataset/sql/SetOperatorsITCase.scala        | 264 +++++
 .../table/runtime/dataset/sql/SortITCase.scala  | 186 ++++
 .../runtime/dataset/sql/TableSourceITCase.scala |  81 ++
 .../dataset/sql/TableWithSQLITCase.scala        | 116 +++
 .../dataset/table/AggregationsITCase.scala      | 379 ++++++++
 .../runtime/dataset/table/CalcITCase.scala      | 381 ++++++++
 .../runtime/dataset/table/CastingITCase.scala   | 110 +++
 .../dataset/table/DataSetCalcITCase.scala       | 105 ++
 .../DataSetUserDefinedFunctionITCase.scala      | 346 +++++++
 .../table/DataSetWindowAggregateITCase.scala    | 357 +++++++
 .../runtime/dataset/table/JoinITCase.scala      | 298 ++++++
 .../dataset/table/SetOperatorsITCase.scala      | 228 +++++
 .../runtime/dataset/table/SortITCase.scala      | 226 +++++
 .../dataset/table/TableEnvironmentITCase.scala  | 179 ++++
 .../runtime/dataset/table/TableSinkITCase.scala |  69 ++
 .../dataset/table/TableSourceITCase.scala       |  79 ++
 .../datastream/DataStreamAggregateITCase.scala  | 262 -----
 .../datastream/DataStreamCalcITCase.scala       |  81 --
 .../DataStreamUserDefinedFunctionITCase.scala   | 245 -----
 .../table/runtime/datastream/StreamITCase.scala |  82 ++
 .../datastream/StreamingWithStateTestBase.scala |  39 +
 .../datastream/TimeAttributesITCase.scala       | 444 ---------
 .../datastream/sql/OverWindowITCase.scala       | 838 ++++++++++++++++
 .../runtime/datastream/sql/SqlITCase.scala      | 319 ++++++
 .../datastream/sql/TableSourceITCase.scala      |  61 ++
 .../runtime/datastream/table/CalcITCase.scala   | 269 ++++++
 .../table/GroupAggregationsITCase.scala         | 158 +++
 .../table/GroupWindowAggregationsITCase.scala   | 446 +++++++++
 .../datastream/table/OverWindowITCase.scala     | 347 +++++++
 .../datastream/table/RetractionITCase.scala     | 193 ++++
 .../datastream/table/TableSinksITCase.scala     | 510 ++++++++++
 .../datastream/table/TableSourceITCase.scala    |  80 ++
 .../datastream/table/TimeAttributesITCase.scala | 407 ++++++++
 .../runtime/datastream/table/UnionITCase.scala  |  75 ++
 .../table/UserDefinedFunctionITCase.scala       | 246 +++++
 .../table/sinks/StreamTableSinksITCase.scala    | 511 ----------
 .../validation/TableSinksValidationTest.scala   |  45 +
 .../flink/table/sources/TableSourceTest.scala   |  84 ++
 .../table/sources/TableSourceTestBase.scala     |  61 ++
 .../table/sources/batch/TableSourceTest.scala   | 421 ++++++++
 .../sources/stream/sql/TableSourceTest.scala    |  45 +
 .../sources/stream/table/TableSourceTest.scala  |  91 ++
 .../validation/TableSourceBalidationTest.scala  |  52 +
 .../flink/table/utils/TableTestBase.scala       |  82 +-
 189 files changed, 18022 insertions(+), 14980 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 4f32382..68719b8d 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.runtime.datastream.StreamITCase;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -38,6 +38,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+
+
 /**
  * Integration tests for streaming SQL.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
deleted file mode 100644
index aad3403..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for testing aggregate plans.
-  */
-class AggregationTest extends TableTestBase {
-
-  @Test
-  def testAggregateQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testAggregateWithFilterQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-        "DataSetValues",
-        calcNode,
-        tuples(List(null,null,null)),
-        term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testAggregateGroupQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        batchTableNode(0),
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select","a", "b", "c") ,
-      term("where","=(a, 1)")
-    )
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        calcNode,
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testAggregateGroupWithFilterTableApi(): Unit = {
-
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.groupBy('a)
-      .select('a, 'a.avg, 'b.sum, 'c.count)
-      .where('a === 1)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      calcNode,
-      term("groupBy", "a"),
-      term("select",
-        "a",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable,expected)
-  }
-
-  @Test
-  def testAggregateTableApi(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testAggregateWithFilterTableApi(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
-      .select('a.avg,'b.sum,'c.count)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      // ReduceExpressionsRule will add cast for Project node by force
-      // if the input of the Project node has constant expression.
-      term("select", "CAST(1) AS a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-      "DataSetValues",
-      calcNode,
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
deleted file mode 100644
index 0c337e0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table
-
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable}
-import org.apache.calcite.tools.RuleSets
-import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder}
-import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-
-class CalciteConfigBuilderTest {
-
-  @Test
-  def testDefaultRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder().build()
-
-    assertFalse(cc.replacesNormRuleSet)
-    assertFalse(cc.getNormRuleSet.isDefined)
-
-    assertFalse(cc.replacesLogicalOptRuleSet)
-    assertFalse(cc.getLogicalOptRuleSet.isDefined)
-
-    assertFalse(cc.replacesPhysicalOptRuleSet)
-    assertFalse(cc.getPhysicalOptRuleSet.isDefined)
-
-    assertFalse(cc.replacesDecoRuleSet)
-    assertFalse(cc.getDecoRuleSet.isDefined)
-  }
-
-  @Test
-  def testRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-      .build()
-
-    assertFalse(cc.replacesNormRuleSet)
-    assertTrue(cc.getNormRuleSet.isDefined)
-
-    assertTrue(cc.replacesLogicalOptRuleSet)
-    assertTrue(cc.getLogicalOptRuleSet.isDefined)
-
-    assertTrue(cc.replacesPhysicalOptRuleSet)
-    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
-
-    assertTrue(cc.replacesDecoRuleSet)
-    assertTrue(cc.getDecoRuleSet.isDefined)
-  }
-
-  @Test
-  def testReplaceNormalizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesNormRuleSet)
-    assertTrue(cc.getNormRuleSet.isDefined)
-    val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
-  }
-
-  @Test
-  def testReplaceNormalizationAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesNormRuleSet)
-    assertTrue(cc.getNormRuleSet.isDefined)
-    val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet
-    assertEquals(2, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
-    assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
-  }
-
-  @Test
-  def testAddNormalizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesNormRuleSet)
-    assertTrue(cc.getNormRuleSet.isDefined)
-    val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
-  }
-
-  @Test
-  def testAddAddNormalizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
-      .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
-        ReduceExpressionsRule.CALC_INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesNormRuleSet)
-    assertTrue(cc.getNormRuleSet.isDefined)
-    val cList = cc.getNormRuleSet.get.iterator().asScala.toList
-    assertEquals(3, cList.size)
-    assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
-    assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
-    assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
-  }
-
-  @Test
-  def testReplaceLogicalOptimizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .build()
-
-    assertEquals(true, cc.replacesLogicalOptRuleSet)
-    assertTrue(cc.getLogicalOptRuleSet.isDefined)
-    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-  }
-
-  @Test
-  def testReplaceLogicalOptimizationAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-        .build()
-
-    assertEquals(true, cc.replacesLogicalOptRuleSet)
-    assertTrue(cc.getLogicalOptRuleSet.isDefined)
-    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testAddLogicalOptimizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .addLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-        .build()
-
-    assertEquals(false, cc.replacesLogicalOptRuleSet)
-    assertTrue(cc.getLogicalOptRuleSet.isDefined)
-    val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testReplacePhysicalOptimizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .build()
-
-    assertEquals(true, cc.replacesPhysicalOptRuleSet)
-    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
-    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-  }
-
-  @Test
-  def testReplacePhysicalOptimizationAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-        .build()
-
-    assertEquals(true, cc.replacesPhysicalOptRuleSet)
-    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
-    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testAddPhysicalOptimizationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-        .addPhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-        .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-        .build()
-
-    assertEquals(false, cc.replacesPhysicalOptRuleSet)
-    assertTrue(cc.getPhysicalOptRuleSet.isDefined)
-    val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testReplaceDecorationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesDecoRuleSet)
-    assertTrue(cc.getDecoRuleSet.isDefined)
-    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-  }
-
-  @Test
-  def testReplaceDecorationAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesDecoRuleSet)
-    assertTrue(cc.getDecoRuleSet.isDefined)
-    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
-    assertEquals(2, cSet.size)
-    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-    assertTrue(cSet.contains(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE))
-  }
-
-  @Test
-  def testAddDecorationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesDecoRuleSet)
-    assertTrue(cc.getDecoRuleSet.isDefined)
-    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-  }
-
-  @Test
-  def testAddAddDecorationRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE))
-      .addDecoRuleSet(RuleSets.ofList(DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
-                                      DataStreamRetractionRules.ACCMODE_INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesDecoRuleSet)
-    assertTrue(cc.getDecoRuleSet.isDefined)
-    val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
-    assertEquals(3, cList.size)
-    assertEquals(cList.head, DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE)
-    assertEquals(cList(1), DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE)
-    assertEquals(cList(2), DataStreamRetractionRules.ACCMODE_INSTANCE)
-  }
-
-  @Test
-  def testDefaultOperatorTable(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .build()
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertFalse(cc.getSqlOperatorTable.isDefined)
-  }
-
-  @Test
-  def testReplaceOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceSqlOperatorTable(oracleTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-
-    assertEquals(true, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-  }
-
-  @Test
-  def testReplaceAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-    val stdTable = new SqlStdOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceSqlOperatorTable(oracleTable)
-      .addSqlOperatorTable(stdTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-    val stdOps = stdTable.getOperatorList.asScala
-
-    assertEquals(true, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size + stdOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-    for (o <- stdOps) {
-      assertTrue(ops.contains(o))
-    }
-
-  }
-
-  @Test
-  def testAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addSqlOperatorTable(oracleTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-  }
-
-  @Test
-  def testAddAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-    val stdTable = new SqlStdOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addSqlOperatorTable(oracleTable)
-      .addSqlOperatorTable(stdTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-    val stdOps = stdTable.getOperatorList.asScala
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size + stdOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-    for (o <- stdOps) {
-      assertTrue(ops.contains(o))
-    }
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
deleted file mode 100644
index f5f5ff1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-
-class CompositeFlatteningTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testDuplicateFlattening(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    table.select('a.flatten(), 'a.flatten())
-  }
-
-  @Test
-  def testMultipleFlatteningsTable(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table.select('a.flatten(), 'c, 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "c",
-        "b._1 AS b$_1",
-        "b._2 AS b$_2"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testMultipleFlatteningsSql(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS _1",
-        "a._2 AS _2",
-        "c",
-        "b._1 AS _10",
-        "b._2 AS _20"
-      )
-    )
-
-    util.verifySql(
-      "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
-      expected)
-  }
-
-  @Test
-  def testNestedFlattenings(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
-
-    val result = table.select('a.flatten(), 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "b"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testScalarFunctionAccess(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[(String, Int)]("MyTable", 'a, 'b)
-
-    val result = table.select(
-      giveMeCaseClass().get("my"),
-      giveMeCaseClass().get("clazz"),
-      giveMeCaseClass().flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}
-
-object CompositeFlatteningTest {
-
-  case class TestCaseClass(my: String, clazz: Int)
-
-  object giveMeCaseClass extends ScalarFunction {
-    def eval(): TestCaseClass = {
-      TestCaseClass("hello", 42)
-    }
-
-    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-      createTypeInformation[TestCaseClass]
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
index 2e9ee46..9bc3e51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -429,9 +429,9 @@ class ExpressionReductionTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
 
-    util.tEnv.registerTable("NewTable", newTable)
+    util.tableEnv.registerTable("NewTable", newTable)
 
     val sqlQuery = "SELECT a FROM NewTable"
 
@@ -447,9 +447,9 @@ class ExpressionReductionTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
 
-    util.tEnv.registerTable("NewTable", newTable)
+    util.tableEnv.registerTable("NewTable", newTable)
 
     val sqlQuery = "SELECT a FROM NewTable"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
deleted file mode 100644
index 27dd8d8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table
-
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for external catalog query plan.
-  */
-class ExternalCatalogTest extends TableTestBase {
-  private val table1Path: Array[String] = Array("test", "db1", "tb1")
-  private val table1TopLevelPath: Array[String] = Array("test", "tb1")
-  private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
-  private val table2Path: Array[String] = Array("test", "db2", "tb2")
-  private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
-
-  @Test
-  def testBatchTableApi(): Unit = {
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "db1", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-    val result = table2
-        .select('d * 2, 'e, 'g.upperCase())
-        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
-      ),
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchSQL(): Unit = {
-    val util = batchTestUtil()
-
-    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
-        "(SELECT a * 2, b, c FROM test.db1.tb1)"
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
-        term("where", "<(d, 3)")),
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS EXPR$0", "b", "c")
-      ),
-      term("union", "EXPR$0", "e", "g"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamTableApi(): Unit = {
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "db1", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-
-    val result = table2.where("d < 3")
-        .select('d * 2, 'e, 'g.upperCase())
-        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataStreamUnion",
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
-        term("where", "<(d, 3)")
-      ),
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union all", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamSQL(): Unit = {
-    val util = streamTestUtil()
-
-    util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
-        "(SELECT a * 2, b, c FROM test.db1.tb1)"
-
-    val expected = binaryNode(
-      "DataStreamUnion",
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
-        term("where", "<(d, 3)")),
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS EXPR$0", "b", "c")
-      ),
-      term("union all", "EXPR$0", "e", "g"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-
-  @Test
-  def testTopLevelTable(): Unit = {
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-    val result = table2
-      .select('d * 2, 'e, 'g.upperCase())
-      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
-      ),
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-        s"fields=[${fields.mkString(", ")}])"
-  }
-
-  def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
-    s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-        s"fields=[${fields.mkString(", ")}])"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
deleted file mode 100644
index 98a8edb..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.scala._
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode}
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.junit.Assert.assertEquals
-
-class TableEnvironmentTest extends TableTestBase {
-
-  val tEnv = new MockTableEnvironment
-
-  val tupleType = new TupleTypeInfo(
-    INT_TYPE_INFO,
-    STRING_TYPE_INFO,
-    DOUBLE_TYPE_INFO)
-
-  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
-  val cRowType = new CRowTypeInfo(rowType)
-
-  val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
-  val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
-  val atomicType = INT_TYPE_INFO
-
-  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
-
-  @Test
-  def testGetFieldInfoRow(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(rowType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-  
-  @Test
-  def testGetFieldInfoRowNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      rowType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-  
-  @Test
-  def testGetFieldInfoTuple(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(tupleType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClass(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(caseClassType)
-
-    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojo(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(pojoType)
-
-    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoAtomic(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(atomicType)
-
-    fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoGenericRow(): Unit = {
-    tEnv.getFieldInfo(genericRowType)
-  }
-
-  @Test
-  def testGetFieldInfoTupleNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClassNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoPojoNames1(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-  }
-
-  @Test
-  def testGetFieldInfoPojoNames2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("pf3"),
-        UnresolvedFieldReference("pf1"),
-        UnresolvedFieldReference("pf2")
-      ))
-
-    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoAtomicName1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      atomicType,
-      Array(UnresolvedFieldReference("name")))
-
-    fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoAtomicName2(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2")
-      ))
-  }
-
-  @Test
-  def testGetFieldInfoTupleAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f0"), "name1"),
-        Alias(UnresolvedFieldReference("f1"), "name2"),
-        Alias(UnresolvedFieldReference("f2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoTupleAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f2"), "name1"),
-        Alias(UnresolvedFieldReference("f0"), "name2"),
-        Alias(UnresolvedFieldReference("f1"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoTupleAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test
-  def testGetFieldInfoCClassAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf1"), "name1"),
-        Alias(UnresolvedFieldReference("cf2"), "name2"),
-        Alias(UnresolvedFieldReference("cf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClassAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf3"), "name1"),
-        Alias(UnresolvedFieldReference("cf1"), "name2"),
-        Alias(UnresolvedFieldReference("cf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoCClassAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test
-  def testGetFieldInfoPojoAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf1"), "name1"),
-        Alias(UnresolvedFieldReference("pf2"), "name2"),
-        Alias(UnresolvedFieldReference("pf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojoAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf3"), "name1"),
-        Alias(UnresolvedFieldReference("pf1"), "name2"),
-        Alias(UnresolvedFieldReference("pf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoPojoAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoAtomicAlias(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        Alias(UnresolvedFieldReference("name1"), "name2")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoGenericRowAlias(): Unit = {
-    tEnv.getFieldInfo(
-      genericRowType,
-      Array(UnresolvedFieldReference("first")))
-  }
-
-  @Test
-  def testSqlWithoutRegisteringForBatchTables(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
-
-    val sqlTable = util.tEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a, b, c"),
-      term("where", ">(b, 12)"))
-
-    util.verifyTable(sqlTable, expected)
-
-    val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
-
-    val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
-
-    val join = unaryNode(
-      "DataSetJoin",
-      binaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        batchTableNode(1),
-        term("select", "c")),
-      term("where", "=(c, d)"),
-      term("join", "c, d, e, f"),
-      term("joinType", "InnerJoin"))
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      join,
-      term("select", "d, e, f"))
-
-    util.verifyTable(sqlTable2, expected2)
-  }
-
-  @Test
-  def testSqlWithoutRegisteringForStreamTables(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
-
-    val sqlTable = util.tEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a, b, c"),
-      term("where", ">(b, 12)"))
-
-    util.verifyTable(sqlTable, expected)
-
-    val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
-
-    val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
-        s"UNION ALL SELECT a, b, c FROM $table")
-
-    val expected2 = binaryNode(
-      "DataStreamUnion",
-      streamTableNode(1),
-      streamTableNode(0),
-      term("union all", "d, e, f"))
-
-    util.verifyTable(sqlTable2, expected2)
-  }
-
-}
-
-case class CClass(cf1: Int, cf2: String, cf3: Double)
-
-class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
-  def this() = this(0, "", 0.0)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
deleted file mode 100644
index 39c4c16..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, TableSchema, Types}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
-
-class TableSchemaTest extends TableTestBase {
-
-  @Test
-  def testBatchTableSchema(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
-    val schema = table.getSchema
-
-    assertEquals("a", schema.getColumnNames.apply(0))
-    assertEquals("b", schema.getColumnNames.apply(1))
-
-    assertEquals(Types.INT, schema.getTypes.apply(0))
-    assertEquals(Types.STRING, schema.getTypes.apply(1))
-
-    val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
-    assertEquals(expectedString, schema.toString)
-
-    assertTrue(schema.getColumnName(3).isEmpty)
-    assertTrue(schema.getType(-1).isEmpty)
-    assertTrue(schema.getType("c").isEmpty)
-  }
-
-  @Test
-  def testStreamTableSchema(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
-    val schema = table.getSchema
-
-    assertEquals("a", schema.getColumnNames.apply(0))
-    assertEquals("b", schema.getColumnNames.apply(1))
-
-    assertEquals(Types.INT, schema.getTypes.apply(0))
-    assertEquals(Types.STRING, schema.getTypes.apply(1))
-
-    val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
-    assertEquals(expectedString, schema.toString)
-
-    assertTrue(schema.getColumnName(3).isEmpty)
-    assertTrue(schema.getType(-1).isEmpty)
-    assertTrue(schema.getType("c").isEmpty)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testInvalidSchema() {
-    val fieldNames = Array("a", "b", "c")
-    val typeInfos: Array[TypeInformation[_]] = Array(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
-    new TableSchema(fieldNames, typeInfos)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
deleted file mode 100644
index 24a32de..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{CsvTableSource, TableSource}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.expressions.utils._
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableTestBase {
-
-  private val projectedFields: Array[String] = Array("last", "id", "score")
-  private val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  @Test
-  def testTableSourceScanToString(): Unit = {
-    val (tableSource1, _) = filterableTableSource
-    val (tableSource2, _) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource("table1", tableSource1)
-    tEnv.registerTableSource("table2", tableSource2)
-
-    val table1 = tEnv.scan("table1").where("amount > 2")
-    val table2 = tEnv.scan("table2").where("amount > 2")
-    val result = table1.unionAll(table2)
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      batchFilterableSourceTableNode(
-        "table1",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      batchFilterableSourceTableNode(
-        "table2",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("union", "name, id, amount, price")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  // batch plan
-
-  @Test
-  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last.upperCase(), 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(tableName, projectedFields),
-      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanPlanSQL(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-
-    util.tEnv.registerTableSource(tableName, tableSource)
-
-    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = batchSourceTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithoutPushDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("price * 2 < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price")),
-      term("select", "price", "id", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterablePartialPushDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .where("amount > 2 && price * 2 < 32")
-      .select('price, 'name.lowerCase(), 'amount)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "LOWER(name) AS _c1", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableFullyPushedDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && amount < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2 && 'amount < 32"),
-      term("select", "price", "id", "amount")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithUnconvertedExpression(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithUDF(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-    val func = Func0
-    tEnv.registerFunction("func0", func)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && func0(amount) < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", s"<(${func.functionIdentifier}(amount), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  // stream plan
-
-  @Test
-  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanSQL(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-
-    util.tEnv.registerTableSource(tableName, tableSource)
-
-    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = streamSourceTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('price, 'id, 'amount)
-      .where("amount > 2 && price * 2 < 32")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  // csv builder
-
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
-    val source1 = CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-      .build()
-
-    val source2 = new CsvTableSource(
-      "/path/to/csv",
-      Array("myfield", "myfield2"),
-      Array(Types.STRING, Types.INT),
-      "#",
-      "\r\n",
-      ';',
-      true,
-      "%%",
-      true)
-
-    Assert.assertEquals(source1, source2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
-  }
-
-  // utils
-
-  def filterableTableSource:(TableSource[_], String) = {
-    val tableSource = new TestFilterableTableSource
-    (tableSource, "filterableTable")
-  }
-
-  def csvTable: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def batchFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "BatchTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-
-  def streamFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "StreamTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
deleted file mode 100644
index 10af4d7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainTest
-  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
-
-  val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilterWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testFilterWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-}


Mime
View raw message