flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [40/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/JoinStringExpressionTest.scala
new file mode 100644
index 0000000..2302cb9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class JoinStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testJoin(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e").select("c, g")
+
+    verifyTableEquals(t1Scala, t1Java)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
+
+    verifyTableEquals(t1Scala, t1Java)
+  }
+
+  @Test
+  def testJoinWithJoinFilter(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
+
+    verifyTableEquals(t1Scala, t1Java)
+  }
+
+  @Test
+  def testJoinWithMultipleKeys(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+    val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
+
+    verifyTableEquals(t1Scala, t1Java)
+  }
+
+  @Test
+  def testJoinWithAggregation(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
+    val t1Java = ds1.join(ds2).where("a === d").select("g.count")
+
+    verifyTableEquals(t1Scala, t1Java)
+  }
+
+  @Test
+  def testJoinWithGroupedAggregation(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2)
+      .where('a === 'd)
+      .groupBy('a, 'd)
+      .select('b.sum, 'g.count)
+    val t2 = ds1.join(ds2)
+      .where("a = d")
+      .groupBy("a, d")
+      .select("b.sum, g.count")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testJoinPushThroughJoin(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds3 = util.addTable[(Int, Long, String)]("Table4",'j, 'k, 'l)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2)
+      .where(Literal(true))
+      .join(ds3)
+      .where('a === 'd && 'e === 'k)
+      .select('a, 'f, 'l)
+    val t2 = ds1.join(ds2)
+      .where("true")
+      .join(ds3)
+      .where("a === d && e === k")
+      .select("a, f, l")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testJoinWithDisjunctivePred(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testJoinWithExpressionPreds(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testLeftJoinWithMultipleKeys(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testRightJoinWithMultipleKeys(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    verifyTableEquals(t1, t2)
+  }
+
+  @Test
+  def testFullOuterJoinWithMultipleKeys(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    verifyTableEquals(t1, t2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala
new file mode 100644
index 0000000..57d5488
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class AggregateValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testNonWorkingAggregationDataTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(String, Int)]("Table2")
+
+    // Must fail. Field '_1 is not a numeric type.
+    t.select('_1.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoNestedAggregations(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(String, Int)]("Table2")
+
+    // Must fail. Sum aggregation can not be chained.
+    t.select('_2.sum.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // must fail. '_foo not a valid field
+    t.groupBy('_foo).select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.groupBy('a, 'b)
+    // must fail. 'c is not a grouping key or aggregation
+    .select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAggregationOnNonExistingField(): Unit = {
+
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // Must fail. Field 'foo does not exist.
+    t.select('foo.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testInvalidUdAggArgs() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+
+    // must fail. UDAGG does not accept String type
+    t.select(myWeightedAvg('c, 'a))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingInvalidUdAggArgs() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+
+    t.groupBy('b)
+    // must fail. UDAGG does not accept String type
+    .select(myWeightedAvg('c, 'a))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingNestedUdAgg() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+
+    t.groupBy('c)
+    // must fail. UDAGG does not accept String type
+    .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testAggregationOnNonExistingFieldJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.select("foo.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNonWorkingAggregationDataTypesJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
+    // Must fail. Cannot compute SUM aggregate on String field.
+    t.select("c.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNoNestedAggregationsJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
+    // Must fail. Aggregation on aggregation not allowed.
+    t.select("b.sum.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingOnNonExistentFieldJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // must fail. Field foo is not in input
+    t.groupBy("foo")
+    .select("a.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingInvalidSelectionJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.groupBy("a, b")
+    // must fail. Field c is not a grouping key or aggregation
+    .select("c")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testUnknownUdAggJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // must fail. unknown is not known
+    t.select("unknown(c)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingUnknownUdAggJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.groupBy("a, b")
+    // must fail. unknown is not known
+    .select("unknown(c)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testInvalidUdAggArgsJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+    // must fail. UDAGG does not accept String type
+    t.select("myWeightedAvg(c, a)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingInvalidUdAggArgsJava() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    val myWeightedAvg = new WeightedAvgWithMergeAndReset
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+
+    t.groupBy("b")
+    // must fail. UDAGG does not accept String type
+    .select("myWeightedAvg(c, a)")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..b37339a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+class CalcValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectInvalidFieldFields(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+      // must fail. Field 'foo does not exist
+      .select('a, 'foo)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming2(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'a
+      .select('a, 'b as 'a).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFilterInvalidFieldName(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    // must fail. Field 'foo does not exist
+    t.filter( 'foo === 2 )
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectInvalidField() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    t.select("a + 1, foo + 2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousFieldNames() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    t.select("a + 1 as foo, b + 2 as foo")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFilterInvalidField() {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist.
+    t.filter("foo = 17")
+  }
+
+  @Test
+  def testAliasStarException(): Unit = {
+    val util = batchTestUtil()
+
+    try {
+      util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
+      fail("TableException expected")
+    } catch {
+      case _: TableException => //ignore
+    }
+
+    try {
+      util.addTable[(Int, Long, String)]("Table2")
+      .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      util.addTable[(Int, Long, String)]("Table3").as('*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+    try {
+      util.addTable[(Int, Long, String)]("Table4", 'a, 'b, 'c).select('*, 'b)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testDuplicateFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    table.select('a.flatten(), 'a.flatten())
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
new file mode 100644
index 0000000..7cf1b82
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowValidationTest extends TableTestBase {
+
+  //===============================================================================================
+  // Common test
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupByWithoutWindowAlias(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowTimeRef(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
+      .groupBy('w2)
+      .select('string)
+  }
+
+  //===============================================================================================
+  // Tumbling Windows
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidProcessingTimeDefinition(): Unit = {
+    val util = batchTestUtil()
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidProcessingTimeDefinition2(): Unit = {
+    val util = batchTestUtil()
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidEventTimeDefinition(): Unit = {
+    val util = batchTestUtil()
+    // definition must not extend schema
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Tumble over 2.minutes on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Tumble over 2.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  //===============================================================================================
+  // Sliding Windows
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Slide over 2.minutes every 1.minute on 'long as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Session withGap 2.minutes on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Session withGap 2.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..3cc278b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit._
+
+class JoinValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinNonExistingKey(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'foo does not exist
+      .where('foo === 'e)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'a is Int, and 'g is String
+      .where('a === 'g)
+      .select('c, 'g)
+
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithAmbiguousFields(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
+
+    ds1.join(ds2)
+      // must fail. Both inputs share the same field 'c
+      .where('a === 'd)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate1(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('d === 'f)
+      .select('c, 'g)
+      .toDataSet[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate2(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('a < 'd)
+      .select('c, 'g)
+      .toDataSet[Row]
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoJoinCondition(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testRightJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testRightJoinWithLocalPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithNonEquiJoinPredicate(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
+
+    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    in1.join(in2).where('b === 'e).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvsJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    // Must fail. Tables are bound to different TableEnvironments.
+    in1.join(in2).where("a === d").select("g.count")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..df8f7df
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class OverWindowValidationTest extends TableTestBase {
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidOverAggregation(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
+    val overAgg = new OverAgg0
+    t.select('c.count, overAgg('b, 'a))
+  }
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidOverAggregation2(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val overAgg = new OverAgg0
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string,'w)
+      .select(overAgg('long, 'int))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SetOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SetOperatorsValidationTest.scala
new file mode 100644
index 0000000..814298a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SetOperatorsValidationTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class SetOperatorsValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentColumnSize(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'd, 'c, 'e)
+
+    // must fail. Union inputs have different column size.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Union inputs have different field types.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Minus inputs have different field types.
+    ds1.minus(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAllTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.minusAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectWithDifferentFieldTypes(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Intersect inputs have different field types.
+    ds1.intersect(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.intersect(ds2).select('c)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
new file mode 100644
index 0000000..5064687
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit._
+
+class SortValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchWithoutOrder(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    ds.limit(0, 5)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentTest.scala
deleted file mode 100644
index f492995..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentTest.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-class TableEnvironmentTest extends TableTestBase {
-
-  val tEnv = new MockTableEnvironment
-
-  val tupleType = new TupleTypeInfo(
-    INT_TYPE_INFO,
-    STRING_TYPE_INFO,
-    DOUBLE_TYPE_INFO)
-
-  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
-  val cRowType = new CRowTypeInfo(rowType)
-
-  val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
-  val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
-  val atomicType = INT_TYPE_INFO
-
-  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
-
-  @Test
-  def testGetFieldInfoRow(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(rowType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoRowNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      rowType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoTuple(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(tupleType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClass(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(caseClassType)
-
-    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojo(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(pojoType)
-
-    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoAtomic(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(atomicType)
-
-    fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoTupleNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClassNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojoNames2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("pf3"),
-        UnresolvedFieldReference("pf1"),
-        UnresolvedFieldReference("pf2")
-      ))
-
-    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoAtomicName1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      atomicType,
-      Array(UnresolvedFieldReference("name")))
-
-    fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoTupleAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f0"), "name1"),
-        Alias(UnresolvedFieldReference("f1"), "name2"),
-        Alias(UnresolvedFieldReference("f2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoTupleAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f2"), "name1"),
-        Alias(UnresolvedFieldReference("f0"), "name2"),
-        Alias(UnresolvedFieldReference("f1"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClassAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf1"), "name1"),
-        Alias(UnresolvedFieldReference("cf2"), "name2"),
-        Alias(UnresolvedFieldReference("cf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCClassAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf3"), "name1"),
-        Alias(UnresolvedFieldReference("cf1"), "name2"),
-        Alias(UnresolvedFieldReference("cf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojoAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf1"), "name1"),
-        Alias(UnresolvedFieldReference("pf2"), "name2"),
-        Alias(UnresolvedFieldReference("pf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoPojoAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf3"), "name1"),
-        Alias(UnresolvedFieldReference("pf1"), "name2"),
-        Alias(UnresolvedFieldReference("pf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testSqlWithoutRegisteringForBatchTables(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
-
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a, b, c"),
-      term("where", ">(b, 12)"))
-
-    util.verifyTable(sqlTable, expected)
-
-    val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
-
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
-
-    val join = unaryNode(
-      "DataSetJoin",
-      binaryNode(
-        "DataSetCalc",
-        batchTableNode(0),
-        batchTableNode(1),
-        term("select", "c")),
-      term("where", "=(c, d)"),
-      term("join", "c, d, e, f"),
-      term("joinType", "InnerJoin"))
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      join,
-      term("select", "d, e, f"))
-
-    util.verifyTable(sqlTable2, expected2)
-  }
-
-  @Test
-  def testSqlWithoutRegisteringForStreamTables(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
-
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a, b, c"),
-      term("where", ">(b, 12)"))
-
-    util.verifyTable(sqlTable, expected)
-
-    val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
-
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " +
-        s"UNION ALL SELECT a, b, c FROM $table")
-
-    val expected2 = binaryNode(
-      "DataStreamUnion",
-      streamTableNode(1),
-      streamTableNode(0),
-      term("union all", "d, e, f"))
-
-    util.verifyTable(sqlTable2, expected2)
-  }
-
-}
-
-case class CClass(cf1: Int, cf2: String, cf3: Double)
-
-class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
-  def this() = this(0, "", 0.0)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSchemaTest.scala
deleted file mode 100644
index 57e32ad..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSchemaTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
-
-class TableSchemaTest extends TableTestBase {
-
-  @Test
-  def testBatchTableSchema(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
-    val schema = table.getSchema
-
-    assertEquals("a", schema.getColumnNames.apply(0))
-    assertEquals("b", schema.getColumnNames.apply(1))
-
-    assertEquals(Types.INT, schema.getTypes.apply(0))
-    assertEquals(Types.STRING, schema.getTypes.apply(1))
-
-    val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
-    assertEquals(expectedString, schema.toString)
-
-    assertTrue(schema.getColumnName(3).isEmpty)
-    assertTrue(schema.getType(-1).isEmpty)
-    assertTrue(schema.getType("c").isEmpty)
-  }
-
-  @Test
-  def testStreamTableSchema(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
-    val schema = table.getSchema
-
-    assertEquals("a", schema.getColumnNames.apply(0))
-    assertEquals("b", schema.getColumnNames.apply(1))
-
-    assertEquals(Types.INT, schema.getTypes.apply(0))
-    assertEquals(Types.STRING, schema.getTypes.apply(1))
-
-    val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
-    assertEquals(expectedString, schema.toString)
-
-    assertTrue(schema.getColumnName(3).isEmpty)
-    assertTrue(schema.getType(-1).isEmpty)
-    assertTrue(schema.getType("c").isEmpty)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala
deleted file mode 100644
index b987e40..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for testing aggregate plans.
-  */
-class AggregationsTest extends TableTestBase {
-
-  @Test
-  def testAggregate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testAggregateWithFilter(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-        "DataSetValues",
-        calcNode,
-        tuples(List(null,null,null)),
-        term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testGroupAggregate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        batchTableNode(0),
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testGroupAggregateWithFilter(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select","a", "b", "c") ,
-      term("where","=(a, 1)")
-    )
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        calcNode,
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CompositeFlatteningTest.scala
deleted file mode 100644
index 6ae48f8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CompositeFlatteningTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-
-class CompositeFlatteningTest extends TableTestBase {
-
-  @Test
-  def testMultipleFlattening(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS _1",
-        "a._2 AS _2",
-        "c",
-        "b._1 AS _10",
-        "b._2 AS _20"
-      )
-    )
-
-    util.verifySql(
-      "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
-      expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
deleted file mode 100644
index 1f7ce84..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class DataSetSingleRowJoinTest extends TableTestBase {
-
-  @Test
-  def testSingleRowCrossJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Int)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, asum " +
-      "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
-
-    val expected =
-      binaryNode(
-        "DataSetSingleRowJoin",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a1")
-        ),
-        unaryNode(
-          "DataSetCalc",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
-              ),
-              term("union","a1","a2")
-            ),
-            term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
-          ),
-          term("select", "+($f0, $f1) AS asum")
-        ),
-        term("where", "true"),
-        term("join", "a1", "asum"),
-        term("joinType", "NestedLoopInnerJoin")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowEquiJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, a2 " +
-      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
-      "WHERE a1 = cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        binaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a1")
-                ),
-                tuples(List(null)),
-                term("values", "a1")
-              ),
-              term("union","a1")
-            ),
-            term("select", "COUNT(a1) AS cnt")
-          ),
-          term("where", "=(CAST(a1), cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopInnerJoin")
-        ),
-        term("select", "a1", "a2")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowNotEquiJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, a2 " +
-      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
-      "WHERE a1 < cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        binaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a1")
-                ),
-                tuples(List(null)),
-                term("values", "a1")
-              ),
-              term("union", "a1")
-            ),
-            term("select", "COUNT(a1) AS cnt")
-          ),
-          term("where", "<(a1, cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopInnerJoin")
-        ),
-        term("select", "a1", "a2")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowJoinWithComplexPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long)]("A", 'a1, 'a2)
-    util.addTable[(Int, Long)]("B", 'b1, 'b2)
-
-    val query =
-      "SELECT a1, a2, b1, b2 " +
-        "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
-        "WHERE a1 < b1 AND a2 = b2"
-
-    val expected = binaryNode(
-      "DataSetSingleRowJoin",
-      batchTableNode(0),
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetUnion",
-          unaryNode(
-            "DataSetValues",
-            batchTableNode(1),
-            tuples(List(null, null)),
-            term("values", "b1", "b2")
-          ),
-          term("union","b1","b2")
-        ),
-        term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
-      ),
-      term("where", "AND(<(a1, b1)", "=(a2, b2))"),
-      term("join", "a1", "a2", "b1", "b2"),
-      term("joinType", "NestedLoopInnerJoin")
-    )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testRightSingleLeftJoinEqualPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Long, Int)]("A", 'a1, 'a2)
-    util.addTable[(Int, Int)]("B", 'b1, 'b2)
-
-    val queryLeftJoin =
-      "SELECT a2 " +
-        "FROM A " +
-        "  LEFT JOIN " +
-        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
-        "  ON a1 = cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          term("where", "=(a1, cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopLeftJoin")
-        ),
-        term("select", "a2")
-      ) + "\n" +
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetUnion",
-            unaryNode(
-              "DataSetValues",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(1),
-                term("select", "0 AS $f0")),
-              tuples(List(null)), term("values", "$f0")
-            ),
-            term("union", "$f0")
-          ),
-          term("select", "COUNT(*) AS cnt")
-        )
-
-    util.verifySql(queryLeftJoin, expected)
-  }
-
-  @Test
-  def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Long, Int)]("A", 'a1, 'a2)
-    util.addTable[(Int, Int)]("B", 'b1, 'b2)
-
-    val queryLeftJoin =
-      "SELECT a2 " +
-        "FROM A " +
-        "  LEFT JOIN " +
-        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
-        "  ON a1 > cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          term("where", ">(a1, cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopLeftJoin")
-        ),
-        term("select", "a2")
-      ) + "\n" +
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetUnion",
-            unaryNode(
-              "DataSetValues",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(1),
-                term("select", "0 AS $f0")),
-              tuples(List(null)), term("values", "$f0")
-            ),
-            term("union", "$f0")
-          ),
-          term("select", "COUNT(*) AS cnt")
-        )
-
-    util.verifySql(queryLeftJoin, expected)
-  }
-
-  @Test
-  def testLeftSingleRightJoinEqualPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Long, Long)]("A", 'a1, 'a2)
-    util.addTable[(Long, Long)]("B", 'b1, 'b2)
-
-    val queryRightJoin =
-      "SELECT a1 " +
-        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
-        "  RIGHT JOIN " +
-        "A " +
-        "  ON cnt = a2"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetSingleRowJoin",
-          "",
-          term("where", "=(cnt, a2)"),
-          term("join", "cnt", "a1", "a2"),
-          term("joinType", "NestedLoopRightJoin")
-        ),
-        term("select", "a1")
-      ) + unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetUnion",
-            unaryNode(
-              "DataSetValues",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(1),
-                term("select", "0 AS $f0")),
-              tuples(List(null)), term("values", "$f0")
-            ),
-            term("union", "$f0")
-          ),
-          term("select", "COUNT(*) AS cnt")
-        ) + "\n" +
-        batchTableNode(0)
-
-    util.verifySql(queryRightJoin, expected)
-  }
-
-  @Test
-  def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Long, Long)]("A", 'a1, 'a2)
-    util.addTable[(Long, Long)]("B", 'b1, 'b2)
-
-    val queryRightJoin =
-      "SELECT a1 " +
-        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
-        "  RIGHT JOIN " +
-        "A " +
-        "  ON cnt < a2"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetSingleRowJoin",
-          "",
-          term("where", "<(cnt, a2)"),
-          term("join", "cnt", "a1", "a2"),
-          term("joinType", "NestedLoopRightJoin")
-        ),
-        term("select", "a1")
-      ) +
-        unaryNode(
-          "DataSetAggregate",
-          unaryNode(
-            "DataSetUnion",
-            unaryNode(
-              "DataSetValues",
-              unaryNode(
-                "DataSetCalc",
-                batchTableNode(1),
-                term("select", "0 AS $f0")),
-              tuples(List(null)), term("values", "$f0")
-            ),
-            term("union", "$f0")
-          ),
-          term("select", "COUNT(*) AS cnt")
-        ) + "\n" +
-        batchTableNode(0)
-
-    util.verifySql(queryRightJoin, expected)
-  }
-
-  @Test
-  def testSingleRowJoinInnerJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Int)]("A", 'a1, 'a2)
-    val query =
-      "SELECT a2, sum(a1) " +
-        "FROM A " +
-        "GROUP BY a2 " +
-        "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        unaryNode(
-          "DataSetSingleRowJoin",
-          unaryNode(
-            "DataSetAggregate",
-            batchTableNode(0),
-            term("groupBy", "a2"),
-            term("select", "a2", "SUM(a1) AS EXPR$1")
-          ),
-          term("where", ">(EXPR$1, EXPR$0)"),
-          term("join", "a2", "EXPR$1", "EXPR$0"),
-          term("joinType", "NestedLoopInnerJoin")
-        ),
-        term("select", "a2", "EXPR$1")
-      ) + "\n" +
-        unaryNode(
-          "DataSetCalc",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a1")
-                ),
-                tuples(List(null)), term("values", "a1")
-              ),
-              term("union", "a1")
-            ),
-            term("select", "SUM(a1) AS $f0")
-          ),
-          term("select", "*($f0, 0.1) AS EXPR$0")
-        )
-
-    util.verifySql(query, expected)
-  }
-}


Mime
View raw message