flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [23/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:32 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SetOperatorsITCase.scala
deleted file mode 100644
index 6b68665..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SetOperatorsITCase.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.union(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.union(ds2).union(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinusAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minusAll(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hi\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinus(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinusDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersect(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world!"))
-    val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 'c)
-
-    val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row]
-
-    val results = intersectDS.collect()
-
-    val expected = "Hi\n" + "Hello\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data1 = new mutable.MutableList[Int]
-    data1 += (1, 1, 1, 2, 2)
-    val data2 = new mutable.MutableList[Int]
-    data2 += (1, 2, 2, 2, 3)
-    val ds1 = env.fromCollection(data1).toTable(tEnv, 'c)
-    val ds2 = env.fromCollection(data2).toTable(tEnv, 'c)
-
-    val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row]
-
-    val expected = "1\n2\n2"
-    val results = intersectDS.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectWithDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 'g)
-
-    val intersectDs = ds1.intersect(ds2).select('c)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectWithScalarExpression(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-
-    val intersectDs = ds1.intersect(ds2)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SortITCase.scala
deleted file mode 100644
index 1784d81..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/SortITCase.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
-    extends TableProgramsClusterTestBase(mode, configMode) {
-
-  private def getExecutionEnvironment = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // set the parallelism explicitly to make sure the query is executed in
-    // a distributed manner
-    env.setParallelism(3)
-    env
-  }
-
-  @Test
-  def testOrderByDesc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByAsc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    def rowOrdering = Ordering.by((r : Row) => {
-      // ordering for this tuple will fall into the previous defined tupleOrdering,
-      // so we just need to return the field by their defining sequence
-      (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
-    })
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(rowOrdering)
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffset(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffsetAndFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableEnvironmentITCase.scala
deleted file mode 100644
index c7153dd..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableEnvironmentITCase.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableEnvironmentITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testSimpleRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds)
-    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRegisterWithFields(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
-    val t = tEnv.scan(tableName).select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
-      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable(tableName, t)
-
-    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
-
-    val expected = "9,4\n" + "10,4\n" +
-      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
-      "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = regT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromAndToCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}
-
-object TableEnvironmentITCase {
-
-  @Parameterized.Parameters(name = "Table config = {0}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TableProgramsTestBase.DEFAULT)
-    ).asJava
-  }
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSinkITCase.scala
deleted file mode 100644
index c8f26ec..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSinkITCase.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import java.io.File
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.sinks.CsvTableSink
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-
-@RunWith(classOf[Parameterized])
-class TableSinkITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testBatchTableSink(): Unit = {
-
-    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
-    tmpFile.deleteOnExit()
-    val path = tmpFile.toURI.toString
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    env.setParallelism(4)
-
-    val input = CollectionDataSets.get3TupleDataSet(env)
-      .map(x => x).setParallelism(4) // increase DOP to 4
-
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
-      .where('a < 5 || 'a > 17)
-      .select('c, 'b)
-      .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
-
-    env.execute()
-
-    val expected = Seq(
-      "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
-      "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n")
-
-    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSourceITCase.scala
deleted file mode 100644
index fb93f26..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/table/TableSourceITCase.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.table
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.{CommonTestData, TestFilterableTableSource}
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableSourceITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testCsvTableSourceWithProjection(): Unit = {
-    val csvTable = CommonTestData.getCsvTableSource
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerTableSource("csvTable", csvTable)
-
-    val results = tEnv
-      .scan("csvTable")
-      .where('score < 20)
-      .select('last, 'id.floor(), 'score * 2)
-      .collect()
-
-    val expected = Seq(
-      "Smith,1,24.6",
-      "Miller,3,15.78",
-      "Smith,4,0.24",
-      "Miller,6,13.56",
-      "Williams,8,4.68").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableSourceWithFilterable(): Unit = {
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerTableSource(tableName, new TestFilterableTableSource)
-    val results = tableEnv
-      .scan(tableName)
-      .where("amount > 4 && price < 9")
-      .select("id, name")
-      .collect()
-
-    val expected = Seq(
-      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamITCase.scala
deleted file mode 100644
index e04bd75..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamITCase.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream
-
-import java.util.Collections
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.types.Row
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-object StreamITCase {
-
-  var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
-  var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
-
-  def clear = {
-    StreamITCase.testResults.clear()
-    StreamITCase.retractedResults.clear()
-  }
-
-  def compareWithList(expected: java.util.List[String]): Unit = {
-    Collections.sort(expected)
-    assertEquals(expected.asScala, StreamITCase.testResults.sorted)
-  }
-
-  final class StringSink[T] extends RichSinkFunction[T]() {
-    def invoke(value: T) {
-      testResults.synchronized {
-        testResults += value.toString
-      }
-    }
-  }
-
-  final class RetractMessagesSink extends RichSinkFunction[(Boolean, Row)]() {
-    def invoke(v: (Boolean, Row)) {
-      testResults.synchronized {
-        testResults += (if (v._1) "+" else "-") + v._2
-      }
-    }
-  }
-
-  final class RetractingSink() extends RichSinkFunction[(Boolean, Row)] {
-    def invoke(v: (Boolean, Row)) {
-      retractedResults.synchronized {
-        val value = v._2.toString
-        if (v._1) {
-          retractedResults += value
-        } else {
-          val idx = retractedResults.indexOf(value)
-          if (idx >= 0) {
-            retractedResults.remove(idx)
-          } else {
-            throw new RuntimeException("Tried to retract a value that wasn't added first. " +
-              "This is probably an incorrectly implemented test. " +
-              "Try to set the parallelism of the sink to 1.")
-          }
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamingWithStateTestBase.scala
deleted file mode 100644
index c32dd00..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/StreamingWithStateTestBase.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.datastream
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Rule
-import org.junit.rules.TemporaryFolder
-
-class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
-
-  val _tempFolder = new TemporaryFolder
-
-  @Rule
-  def tempFolder: TemporaryFolder = _tempFolder
-
-  def getStateBackend: RocksDBStateBackend = {
-    val dbPath = tempFolder.newFolder().getAbsolutePath
-    val checkpointPath = tempFolder.newFolder().toURI.toString
-    val backend = new RocksDBStateBackend(checkpointPath)
-    backend.setDbStoragePath(dbPath)
-    backend
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
deleted file mode 100644
index 15828eb..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
+++ /dev/null
@@ -1,838 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.sql
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
-  val data = List(
-    (1L, 1, "Hello"),
-    (2L, 2, "Hello"),
-    (3L, 3, "Hello"),
-    (4L, 4, "Hello"),
-    (5L, 5, "Hello"),
-    (6L, 6, "Hello"),
-    (7L, 7, "Hello World"),
-    (8L, 8, "Hello World"),
-    (20L, 20, "Hello World"))
-
- @Test
-  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a, " +
-      "  SUM(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
-      "  MIN(c) OVER (" +
-      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
-      "FROM MyTable"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-    tEnv.registerTable("MyTable", t)
-
-    val sqlQuery = "SELECT a, " +
-      "  SUM(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
-      "  MIN(c) OVER (" +
-      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
-      "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,0,0",
-      "2,1,0",
-      "2,3,0",
-      "3,6,0",
-      "3,10,0",
-      "3,15,0",
-      "4,21,0",
-      "4,28,0",
-      "4,36,0",
-      "4,45,0",
-      "5,55,0",
-      "5,66,1",
-      "5,77,2",
-      "5,88,3",
-      "5,99,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT c, cnt1 from " +
-      "(SELECT " +
-      "c, " +
-      "count(a) " +
-      " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
-      "as cnt1 from T1)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,1", "Hello World,2", "Hello World,3",
-      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
-    val queryConfig =
-      new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    // for sum aggregation ensure that every time the order of each element is consistent
-    env.setParallelism(1)
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding), " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
-      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
-      "from T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "  c, b, " +
-      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
-      " FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      " c, a, " +
-      "  COUNT(a) " +
-      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) " +
-      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "  c, b, " +
-      "  COUNT(a) " +
-      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  SUM(a) " +
-      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
-      " FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
-
-    val data = Seq(
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))), // early row
-      Right(3L),
-      Left((2L, (2L, 2, "Hello"))), // late row
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(7L),
-      Left((9L, (9L, 9, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((8L, (8L, 8, "Hello World"))),
-      Right(20L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env
-      .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val sqlQuery = "SELECT " +
-      "c, a, " +
-      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
-      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
-      "FROM T1"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7",
-      "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15", "Hello World,7,3,18",
-      "Hello World,8,3,21", "Hello World,8,3,23",
-      "Hello World,9,3,25",
-      "Hello World,20,3,37")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (" +
-      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,1,Hello,6,3,2,3,1",
-      "1,2,Hello,6,3,2,3,1",
-      "1,3,Hello world,6,3,2,3,1",
-      "1,1,Hi,7,4,1,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,6,3,2,3,1",
-      "2,3,Hello world,6,3,2,3,1",
-      "1,4,Hello world,11,5,2,4,1",
-      "1,5,Hello world,29,8,3,7,1",
-      "1,6,Hello world,29,8,3,7,1",
-      "1,7,Hello world,29,8,3,7,1",
-      "2,4,Hello world,15,5,3,5,1",
-      "2,5,Hello world,15,5,3,5,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Left(14000021L, (1, 6L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000024L, (3, 5L, "Hello world")),
-      Left(14000026L, (1, 7L, "Hello world")),
-      Left(14000025L, (1, 8L, "Hello world")),
-      Left(14000022L, (1, 9L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-             .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L))
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,1,Hello,1,1,1,1,1",
-      "1,1,Hello,7,4,1,3,1",
-      "1,2,Hello,7,4,1,3,1",
-      "1,3,Hello world,7,4,1,3,1",
-      "2,2,Hello world,12,6,2,3,1",
-      "2,3,Hello world,12,6,2,3,1",
-      "1,1,Hi,13,7,1,3,1",
-      "1,4,Hello world,17,8,2,4,1",
-      "1,5,Hello world,35,11,3,7,1",
-      "1,6,Hello world,35,11,3,7,1",
-      "1,7,Hello world,35,11,3,7,1",
-      "2,4,Hello world,44,13,3,7,1",
-      "2,5,Hello world,44,13,3,7,1")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
-      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
-      "FROM T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 2L, "Hello")),
-      Left(14000002L, (3, 5L, "Hello")),
-      Left(14000003L, (1, 3L, "Hello")),
-      Left(14000004L, (3, 7L, "Hello world")),
-      Left(14000007L, (4, 9L, "Hello world")),
-      Left(14000008L, (5, 8L, "Hello world")),
-      Right(14000010L),
-      // this element will be discard because it is late
-      Left(14000008L, (6, 8L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (6, 8L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env
-      .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello,2,1,2,2,2",
-      "3,5,Hello,7,2,3,5,2",
-      "1,3,Hello,10,3,3,5,2",
-      "3,7,Hello world,17,4,4,7,2",
-      "1,1,Hi,18,5,3,7,1",
-      "4,9,Hello world,27,6,4,9,1",
-      "5,8,Hello world,35,7,5,9,1",
-      "6,8,Hello world,43,8,5,9,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test sliding event-time unbounded window with partition by **/
-  @Test
-  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "count(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "avg(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "max(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row), " +
-      "min(b) over (" +
-      "partition by a order by rowtime rows between unbounded preceding and current row) " +
-      "from T1"
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (3, 1L, "Hello")),
-      Left(14000003L, (1, 2L, "Hello")),
-      Left(14000004L, (1, 3L, "Hello world")),
-      Left(14000007L, (3, 2L, "Hello world")),
-      Left(14000008L, (2, 2L, "Hello world")),
-      Right(14000010L),
-      // the next 3 elements are late
-      Left(14000008L, (1, 4L, "Hello world")),
-      Left(14000008L, (2, 3L, "Hello world")),
-      Left(14000008L, (3, 3L, "Hello world")),
-      Left(14000012L, (1, 5L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 6L, "Hello world")),
-      // the next 3 elements are late
-      Left(14000019L, (1, 6L, "Hello world")),
-      Left(14000018L, (2, 4L, "Hello world")),
-      Left(14000018L, (3, 4L, "Hello world")),
-      Left(14000022L, (2, 5L, "Hello world")),
-      Left(14000022L, (3, 5L, "Hello world")),
-      Left(14000024L, (1, 7L, "Hello world")),
-      Left(14000023L, (1, 8L, "Hello world")),
-      Left(14000021L, (1, 9L, "Hello world")),
-      Right(14000030L)
-    )
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
deleted file mode 100644
index ec719d2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.sql
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-class SqlITCase extends StreamingWithStateTestBase {
-
-   /** test row stream registered table **/
-  @Test
-  def testRowRegister(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
-
-    val data = List(
-      Row.of("Hello", "Worlds", Int.box(1)),
-      Row.of("Hello", "Hiden", Int.box(5)),
-      Row.of("Hello again", "Worlds", Int.box(2)))
-        
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically 
-    
-    val ds = env.fromCollection(data)
-    
-    val t = ds.toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTableRow", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-    
-  /** test unbounded groupBy (without window) **/
-  @Test
-  def testUnboundedGroupBy(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
-    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  /** test selection **/
-  @Test
-  def testSelectExpressionFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("2,0", "4,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered table **/
-  @Test
-  def testSimpleFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered datastream **/
-  @Test
-  def testDatastreamFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-    tEnv.registerDataStream("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with registered tables **/
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM T1 " +
-      "UNION ALL " +
-      "SELECT * FROM T2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,1,Hi", "1,1,Hi",
-      "2,2,Hello", "2,2,Hello",
-      "3,2,Hello world", "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with filter **/
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT * FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,2,Hello",
-      "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union of a table and a datastream **/
-  @Test
-  def testUnionTableWithDataSet(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT c FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.get3TupleDataStream(env)
-    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestPrimitiveArrayFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array(12, 45), Array(Array(12, 45))),
-      (2, Array(41, 5), Array(Array(18), Array(87))),
-      (3, Array(18, 42), Array(Array(1), Array(45)))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,[12, 45],12",
-      "1,[12, 45],45",
-      "2,[41, 5],41",
-      "2,[41, 5],5",
-      "3,[18, 42],18",
-      "3,[18, 42],42"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestArrayOfArrayFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array(12, 45), Array(Array(12, 45))),
-      (2, Array(41, 5), Array(Array(18), Array(87))),
-      (3, Array(18, 42), Array(Array(1), Array(45)))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "1,[12, 45]",
-      "2,[18]",
-      "2,[87]",
-      "3,[1]",
-      "3,[45]")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnnestObjectArrayFromTableWithFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val data = List(
-      (1, Array((12, "45.6"), (12, "45.612"))),
-      (2, Array((13, "41.6"), (14, "45.2136"))),
-      (3, Array((18, "42.6")))
-    )
-    val stream = env.fromCollection(data)
-    tEnv.registerDataStream("T", stream, 'a, 'b)
-
-    val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
-
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List(
-      "2,[(13,41.6), (14,45.2136)],14,45.2136",
-      "3,[(18,42.6)],18,42.6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
deleted file mode 100644
index 31e7c5a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvTable = CommonTestData.getCsvTableSource
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("persons", csvTable)
-
-    tEnv.sql(
-      "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
-      .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink[Row])
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,Mike,Smith,12.3",
-      "2,Bob,Taylor,45.6",
-      "3,Sam,Miller,7.89")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-}


Mime
View raw message