flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/9] flink git commit: [FLINK-3847] Restructure flink-table test packages.
Date Sun, 01 May 2016 12:47:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
new file mode 100644
index 0000000..321b8ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.utils
+
+import org.apache.flink.api.scala._
+import scala.collection.mutable
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+object StreamTestData {
+
+  def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    env.fromCollection(data)
+  }
+
+  def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    data.+=((4, 3L, "Hello world, how are you?"))
+    data.+=((5, 3L, "I am fine."))
+    data.+=((6, 3L, "Luke Skywalker"))
+    data.+=((7, 4L, "Comment#1"))
+    data.+=((8, 4L, "Comment#2"))
+    data.+=((9, 4L, "Comment#3"))
+    data.+=((10, 4L, "Comment#4"))
+    data.+=((11, 5L, "Comment#5"))
+    data.+=((12, 5L, "Comment#6"))
+    data.+=((13, 5L, "Comment#7"))
+    data.+=((14, 5L, "Comment#8"))
+    data.+=((15, 5L, "Comment#9"))
+    data.+=((16, 6L, "Comment#10"))
+    data.+=((17, 6L, "Comment#11"))
+    data.+=((18, 6L, "Comment#12"))
+    data.+=((19, 6L, "Comment#13"))
+    data.+=((20, 6L, "Comment#14"))
+    data.+=((21, 6L, "Comment#15"))
+    env.fromCollection(data)
+  }
+
+  def get5TupleDataStream(env: StreamExecutionEnvironment):
+      DataStream[(Int, Long, Int, String, Long)] = {
+
+    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    data.+=((1, 1L, 0, "Hallo", 1L))
+    data.+=((2, 2L, 1, "Hallo Welt", 2L))
+    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
+    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
+    data.+=((3, 5L, 4, "ABC", 2L))
+    data.+=((3, 6L, 5, "BCD", 3L))
+    data.+=((4, 7L, 6, "CDE", 2L))
+    data.+=((4, 8L, 7, "DEF", 1L))
+    data.+=((4, 9L, 8, "EFG", 1L))
+    data.+=((4, 10L, 9, "FGH", 2L))
+    data.+=((5, 11L, 10, "GHI", 1L))
+    data.+=((5, 12L, 11, "HIJ", 3L))
+    data.+=((5, 13L, 12, "IJK", 3L))
+    data.+=((5, 14L, 13, "JKL", 2L))
+    data.+=((5, 15L, 14, "KLM", 2L))
+    env.fromCollection(data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
deleted file mode 100644
index 3147a8e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.mutable
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
-
-class FilterITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
deleted file mode 100644
index d606a80..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/SelectITCase.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.mutable
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
-
-class SelectITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/TableSourceITCase.scala
deleted file mode 100644
index f3d60a4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/TableSourceITCase.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
-import org.apache.flink.api.table.sources.StreamTableSource
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testStreamTableSourceTableAPI(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.ingest("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testStreamTableSourceSQL(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}
-
-class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[DataStream]]. */
-  override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
-
-  var running = true
-
-  override def run(ctx: SourceContext[Row]): Unit = {
-    var cnt = 0L
-    while(running && cnt < num) {
-      val out = new Row(3)
-      out.setField(0, s"Record_$cnt")
-      out.setField(1, cnt)
-      out.setField(2, (cnt % 16).toInt)
-
-      ctx.collect(out)
-      cnt += 1
-    }
-  }
-
-  override def cancel(): Unit = {
-    running = false
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
deleted file mode 100644
index ae81f3b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnionITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableException, TableEnvironment, Row}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.JavaConversions._
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
-
-class UnionITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Hi", "Hallo")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testUnionFieldsNameNotOverlap1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testUnionFieldsNameNotOverlap2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    println(StreamITCase.testResults)
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
deleted file mode 100644
index f7bd0ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test
-
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.table.{TableException, TableEnvironment}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
-
-  @Test(expected = classOf[TableException])
-  def testSelectWithAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGroupBy(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-      .groupBy('_1)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testDistinct(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.join(t2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
deleted file mode 100644
index e7dc518..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamITCase.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test.utils
-
-import java.util.Collections
-
-import org.apache.flink.api.table.Row
-import org.junit.Assert._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import scala.collection.JavaConverters._
-
-object StreamITCase {
-
-  var testResults = mutable.MutableList.empty[String]
-
-  def clear = {
-    StreamITCase.testResults.clear()
-  }
-
-  def compareWithList(expected: java.util.List[String]): Unit = {
-    Collections.sort(expected)
-    assertEquals(expected.asScala, StreamITCase.testResults.sorted)
-  }
-
-  final class StringSink extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
-      testResults.synchronized {
-        testResults += value.toString 
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
deleted file mode 100644
index 3ab5f95..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/utils/StreamTestData.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.streaming.test.utils
-
-import org.apache.flink.api.scala._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
-object StreamTestData {
-
-  def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    env.fromCollection(data)
-  }
-
-  def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    data.+=((4, 3L, "Hello world, how are you?"))
-    data.+=((5, 3L, "I am fine."))
-    data.+=((6, 3L, "Luke Skywalker"))
-    data.+=((7, 4L, "Comment#1"))
-    data.+=((8, 4L, "Comment#2"))
-    data.+=((9, 4L, "Comment#3"))
-    data.+=((10, 4L, "Comment#4"))
-    data.+=((11, 5L, "Comment#5"))
-    data.+=((12, 5L, "Comment#6"))
-    data.+=((13, 5L, "Comment#7"))
-    data.+=((14, 5L, "Comment#8"))
-    data.+=((15, 5L, "Comment#9"))
-    data.+=((16, 6L, "Comment#10"))
-    data.+=((17, 6L, "Comment#11"))
-    data.+=((18, 6L, "Comment#12"))
-    data.+=((19, 6L, "Comment#13"))
-    data.+=((20, 6L, "Comment#14"))
-    data.+=((21, 6L, "Comment#15"))
-    env.fromCollection(data)
-  }
-
-  def get5TupleDataStream(env: StreamExecutionEnvironment):
-      DataStream[(Int, Long, Int, String, Long)] = {
-
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
-    data.+=((1, 1L, 0, "Hallo", 1L))
-    data.+=((2, 2L, 1, "Hallo Welt", 2L))
-    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
-    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
-    data.+=((3, 5L, 4, "ABC", 2L))
-    data.+=((3, 6L, 5, "BCD", 3L))
-    data.+=((4, 7L, 6, "CDE", 2L))
-    data.+=((4, 8L, 7, "DEF", 1L))
-    data.+=((4, 9L, 8, "EFG", 1L))
-    data.+=((4, 10L, 9, "FGH", 2L))
-    data.+=((5, 11L, 10, "GHI", 1L))
-    data.+=((5, 12L, 11, "HIJ", 3L))
-    data.+=((5, 13L, 12, "IJK", 3L))
-    data.+=((5, 14L, 13, "JKL", 2L))
-    data.+=((5, 15L, 14, "KLM", 2L))
-    env.fromCollection(data)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
deleted file mode 100644
index 26cdc76..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.plan.PlanGenException
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import scala.collection.JavaConverters._
-import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC}
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testAggregationTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
-
-    val results = t.toDataSet[Row].collect()
-    val expected = "231,1,21,21,11"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      // Must fail. Field 'foo does not exist.
-      .select('foo.avg)
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
-      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-
-    val expected = "1,1,1,1,1.5,1.5,2"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short),
-      (2: Byte, 2: Short)).toTable(tEnv)
-      .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
-
-    val expected = "1,3,2,1,3"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
-      .select(('_1 + 2).avg + 2, '_2.count + 5)
-
-    val expected = "5.5,7"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithTwoCount(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
-      .select('_1.count, '_2.count)
-
-    val expected = "2,2"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationAfterProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
-      .select('_1, '_2, '_3)
-      .select('_1.avg, '_2.sum, '_3.count)
-
-    val expected = "1,3,2"
-    val result = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-
-  @Test(expected = classOf[PlanGenException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Field '_1 is not a numeric type.
-      .select('_1.sum)
-
-    t.collect()
-  }
-
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testNoNestedAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Sum aggregation can not be chained.
-      .select('_2.sum.sum)
-  }
-
-  @Test
-  def testSQLStyleAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select(
-        """Sum( a) as a1, a.sum as a2,
-          |Min (a) as b1, a.min as b2,
-          |Max (a ) as c1, a.max as c2,
-          |Avg ( a ) as d1, a.avg as d2,
-          |Count(a) as e1, a.count as e2
-        """.stripMargin)
-
-    val expected = "231,231,1,1,21,21,11,11,21,21"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testPojoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val input = env.fromElements(
-      MyWC("hello", 1),
-      MyWC("hello", 1),
-      MyWC("ciao", 1),
-      MyWC("hola", 1),
-      MyWC("hola", 1))
-    val expr = input.toTable(tEnv)
-    val result = expr
-      .groupBy('word)
-      .select('word, 'count.sum as 'count)
-      .filter('count === 2)
-      .toDataSet[MyWC]
-
-    val mappedResult = result.map(w => (w.word, w.count * 10)).collect()
-    val expected = "(hello,20)\n" + "(hola,20)"
-    TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
deleted file mode 100644
index af5e6d7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CalcITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleCalc(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7)
-        .select('_1, '_3)
-
-    val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
-      "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
-      val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithTwoFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7 && '_2 === 3)
-        .select('_1, '_3)
-        .where('_1 === 4)
-        .select('_1)
-
-    val expected = "4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 15)
-        .groupBy('_2)
-        .select('_1.min, '_2.count as 'cnt)
-        .where('cnt > 3)
-
-    val expected = "7,4\n" + "11,4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
-      .where('b > 1).select('a, 'd).where('d === 2)
-
-    val expected = "2,2\n" + "3,2\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
deleted file mode 100644
index c0499e4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import java.util.Date
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.codegen.CodeGenException
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testNumericAutoCastInArithmetic(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tEnv)
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
-
-    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNumericAutoCastInComparison(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
-
-    val expected = "2,2,2,2,2.0,2.0"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Ignore // TODO support advanced String operations
-  @Test
-  def testAutoCastToString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable(tEnv)
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date")
-
-    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCasting(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1, 0.0, 1L, true))
-      .toTable(tEnv)
-      .select(
-        // * -> String
-        '_1.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_4.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        // NUMERIC TYPE -> Boolean
-        '_1.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
-        // NUMERIC TYPE -> NUMERIC TYPE
-        '_1.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.INT_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.SHORT_TYPE_INFO),
-        // Boolean -> NUMERIC TYPE
-        '_4.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
-        // identity casting
-        '_1.cast(BasicTypeInfo.INT_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.LONG_TYPE_INFO),
-        '_4.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
-
-    val expected = "1,0.0,1,true," +
-      "true,false,true," +
-      "1.0,0,1," +
-      "1.0," +
-      "1,0.0,1,true\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCastFromString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("1", "true", "2.0"))
-      .toTable(tEnv)
-      .select(
-        // String -> BASIC TYPE (not String, Date, Void, Character)
-        '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.INT_TYPE_INFO),
-        '_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
-
-    val expected = "1,1,1,1,2.0,2.0,true\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore // Date types not supported yet
-  @Test
-  def testCastDateFromString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
-      .toTable(tEnv)
-      .select(
-        '_1.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_3.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        '_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
-
-    val expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
-      "1970-01-17 17:47:53.775\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore // Date types not supported yet
-  @Test
-  def testCastDateToStringAndLong(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
-    val t = ds.toTable(tEnv)
-      .select('_1.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f0),
-        '_2.cast(BasicTypeInfo.DATE_TYPE_INFO).as('f1))
-      .select('f0.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        'f0.cast(BasicTypeInfo.LONG_TYPE_INFO),
-        'f1.cast(BasicTypeInfo.STRING_TYPE_INFO),
-        'f1.cast(BasicTypeInfo.LONG_TYPE_INFO))
-
-    val expected = "2011-05-03 15:51:36.000,1304437896000," +
-      "2011-05-03 15:51:36.000,1304437896000\n"
-    val result = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
deleted file mode 100644
index fc1284e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testDistinct(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val distinct = ds.select('b).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-    val distinct = ds.groupBy('a, 'e).select('e).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
deleted file mode 100644
index 59b835c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import java.util.Date
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.codegen.CodeGenException
-import org.apache.flink.api.table.expressions.Null
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, 10)).toTable(tEnv, 'a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
-
-    val expected = "0,10,2,10,1,-5"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLogic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b)
-
-    val expected = "true,false,true,false"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testComparisons(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, 5, 4)).toTable(tEnv, 'a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
-
-    val expected = "true,true,false,false,true"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCaseInsensitiveForAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((3, 5.toByte)).toTable(tEnv, 'a, 'b)
-      .groupBy("a").select("a, a.count As cnt")
-
-    val expected = "3,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNullLiteral(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((1, 0)).toTable(tEnv, 'a, 'b)
-      .select(
-        'a,
-        'b,
-        Null(BasicTypeInfo.INT_TYPE_INFO),
-        Null(BasicTypeInfo.STRING_TYPE_INFO) === "")
-
-    try {
-      val ds = t.toDataSet[Row]
-      if (!config.getNullCheck) {
-        fail("Exception expected if null check is disabled.")
-      }
-      val results = ds.collect()
-      val expected = "1,0,null,null"
-      TestBaseUtils.compareResultAsText(results.asJava, expected)
-    }
-    catch {
-      case e: CodeGenException =>
-        if (config.getNullCheck) {
-          throw e
-        }
-    }
-  }
-
-  @Test
-  def testEval(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select(
-        ('b && true).eval("true", "false"),
-        false.eval("true", "false"),
-        true.eval(true.eval(true.eval(10, 4), 4), 4))
-
-    val expected = "true,false,10"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testEvalInvalidTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select(('b && true).eval(5, "false"))
-
-    val expected = "true,false,3,10"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  // Date literals not yet supported
-  @Ignore
-  @Test
-  def testDateLiteral(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((0L, "test")).toTable(tEnv, 'a, 'b)
-      .select('a,
-        Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
-        'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
-
-    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
deleted file mode 100644
index 51dfe74..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    val expected = "\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val filterDs = ds.filter( 'c.like("%world%") )
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
-      "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
-      "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
-      "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a < 2 || 'a > 20)
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testConsecutiveFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterBasicType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
-      .filter( 's.like("%a%") )
-
-    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
deleted file mode 100644
index a9edbb0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.{TableEnvironment, Row}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithLongKeys(): Unit = {
-    // This uses very long keys to force serialized comparison.
-    // With short keys, the normalized key is sufficient.
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = env.fromElements(
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      .select('c.sum)
-
-    val expected = "10\n" + "8\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
-      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
-      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
-
-    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithExpression(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-
-    val expected = "2,5\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}


Mime
View raw message