flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases
Date Thu, 29 Sep 2016 13:47:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8243138c3 -> 7758571ae


http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
deleted file mode 100644
index 8889b37..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithLongKeys(): Unit = {
-    // This uses very long keys to force serialized comparison.
-    // With short keys, the normalized key is sufficient.
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = env.fromElements(
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      .select('c.sum)
-
-    val expected = "10\n" + "8\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
-      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
-      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
-
-    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithExpression(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-
-    val expected = "2,5\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
index f6e6081..67cac14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
@@ -19,12 +19,14 @@
 package org.apache.flink.api.scala.batch.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, ValidationException, Row, TableEnvironment}
 import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -32,12 +34,15 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+class JoinITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -53,7 +58,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   def testJoinWithFilter(): Unit = {
 
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
@@ -68,7 +73,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithJoinFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -84,7 +89,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -100,7 +105,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinNonExistingKey(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -114,7 +119,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinWithNonMatchingKeyTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -128,7 +133,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinWithAmbiguousFields(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
@@ -142,7 +147,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate1(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -156,7 +161,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate2(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -170,7 +175,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -185,7 +190,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithGroupedAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -203,7 +208,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinPushThroughJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -223,7 +228,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithDisjunctivePred(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -240,7 +245,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithExpressionPreds(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -259,8 +264,8 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinTablesFromDifferentEnvs(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
@@ -272,7 +277,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -294,7 +299,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testNoJoinCondition(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -306,7 +311,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testNoEquiJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -318,7 +323,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -337,7 +342,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testRightJoinWithNotOnlyEquiJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -353,7 +358,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testFullOuterJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
deleted file mode 100644
index 1143afd..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
-  @Test
-  def testSelectStar(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
deleted file mode 100644
index 84bdbb0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ToTableITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testToTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromAndToCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToFewFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToManyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithAmbiguousFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .toTable(tEnv, 'a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a as 'foo, 'b, 'c)
-  }
-
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
index 772850d..2ce42d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.batch.utils
 
 import java.util
 
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
 import org.apache.flink.api.table.TableConfig
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -37,7 +36,7 @@ class TableProgramsTestBase(
   def config: TableConfig = {
     val conf = new TableConfig
     tableConfigMode match {
-      case NULL =>
+      case NO_NULL =>
         conf.setNullCheck(false)
       case EFFICIENT =>
         conf.setEfficientTypeUsage(true)
@@ -48,21 +47,14 @@ class TableProgramsTestBase(
 }
 
 object TableProgramsTestBase {
-  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean }
-  object TableConfigMode {
-    case object DEFAULT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = false
-    }
-    case object NULL extends TableConfigMode {
-      val nullCheck = true; val efficientTypes = false
-    }
-    case object EFFICIENT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = true
-    }
-  }
+  case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+
+  val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
+  val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
+  val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
 
   @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT))
+    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
new file mode 100644
index 0000000..578ad30
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSelectFirst(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+
+    // verify ProjectMergeRule.
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToFewFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+
+  @Test(expected = classOf[TableException])
+  def testOnlyFieldRefInAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    /*
+     * Test simple filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter('a === 3)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello", "4,3,Hello world, how are you?",
+      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+      "18,6,Comment#12", "20,6,Comment#14")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+    val expected = mutable.MutableList(
+      "1,1,Hi", "3,2,Hello world",
+      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
deleted file mode 100644
index 45b9b04..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class FilterITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
deleted file mode 100644
index c6a2139..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class SelectITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test(expected = classOf[TableException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}


Mime
View raw message