flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/2] flink git commit: [FLINK-5084] [table] Replace Java Table API integration tests by unit tests
Date Tue, 10 Jan 2017 11:30:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master 614acc3e7 -> 649cf054e


http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
new file mode 100644
index 0000000..ace53d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.junit._
+
+class AggregationsStringExpressionTest {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+    val t2 = t.select("_1.sum, _1.min, _1.max, _1.count, _1.avg")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+    val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short)).toTable(tEnv)
+
+    val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+    val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationWithArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
+    val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1.count, '_2.count)
+    val t2 = t.select("_1.count, _2.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationAfterProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .select('_1.avg, '_2.sum, '_3.count)
+
+    val t2 = t.select("_1, _2, _3")
+      .select("_1.avg, _2.sum, _3.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testDistinct(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val distinct = ds.select('b).distinct()
+    val distinct2 = ds.select("b").distinct()
+
+    val lPlan1 = distinct.logicalPlan
+    val lPlan2 = distinct2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val distinct = ds.groupBy('a, 'e).select('e).distinct()
+    val distinct2 = ds.groupBy("a, e").select("e").distinct()
+
+    val lPlan1 = distinct.logicalPlan
+    val lPlan2 = distinct2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('b, 'a.sum)
+    val t2 = t.groupBy("b").select("b, a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('a.sum)
+    val t2 = t.groupBy("b").select("a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupNoAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+
+    val t2 = t
+      .groupBy("b")
+      .select("a.sum as d, b")
+      .groupBy("b, d")
+      .select("b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant1(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('a, 4 as 'four, 'b)
+      .groupBy('four, 'a)
+      .select('four, 'b.sum)
+
+    val t2 = t.select("a, 4 as four, b")
+      .groupBy("four, a")
+      .select("four, b.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant2(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
+    val t2 = t.select("b, 4 as four, a")
+      .groupBy("b, four")
+      .select("four, a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithExpression(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
+    val t2 = t.groupBy("e, b % 3")
+      .select("c.min, e, a.avg, d.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b)
+      .select('b, 'a.sum)
+      .where('b === 2)
+    val t2 = t.groupBy("b")
+      .select("b, a.sum")
+      .where("b = 2")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..a5a5241
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.expressions.Literal
+import org.junit._
+
+class CalcStringExpressionTest {
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('a, 'b, 'c)
+    val t2 = t.select("a, b, c")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _1 as c")
+      .select("a, b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleSelectRenameAll(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _3 as c")
+      .select("a, b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('*)
+    val t2 = t.select("*")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(false) )
+    val t2 = ds.filter("false")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(true) )
+    val t2 = ds.filter("true")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnStringTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'c.like("%world%") )
+    val t2 = ds.filter("c.like('%world%')")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 === 0 )
+    val t2 = ds.filter( "a % 2 = 0 ")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 !== 0 )
+    val t2 = ds.filter("a % 2 <> 0")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a < 2 || 'a > 20)
+    val t2 = ds.filter("a < 2 || a > 20")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testConsecutiveFilters(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+    val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterBasicType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.getStringDataSet(env).toTable(tEnv, 'a)
+
+    val t1 = ds.filter( 'a.like("H%") )
+    val t2 = ds.filter( "a.like('H%')" )
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnCustomType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val t = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+
+    val t1 = t.filter( 's.like("%a%") )
+    val t2 = t.filter("s.like('%a%')")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleCalc(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7)
+      .select('_1, '_3)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7")
+      .select("_1, _3")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testCalcWithTwoFilters(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7 && '_2 === 3)
+      .select('_1, '_3)
+      .where('_1 === 4)
+      .select('_1)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7 && _2 = 3")
+      .select("_1, _3")
+      .where("_1 === 4")
+      .select("_1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testCalcWithAggregation(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 15)
+      .groupBy('_2)
+      .select('_1.min, '_2.count as 'cnt)
+      .where('cnt > 3)
+
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 15")
+      .groupBy("_2")
+      .select("_1.min, _2.count as cnt")
+      .where("cnt > 3")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testCalcJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
+      .where('b > 1).select('a, 'd).where('d === 2)
+    val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
+      .where("b > 1").select("a, d").where("d = 2")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAdvancedDataTypes(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env
+      .fromElements((
+        BigDecimal("78.454654654654654").bigDecimal,
+        BigDecimal("4E+9999").bigDecimal,
+        Date.valueOf("1984-07-12"),
+        Time.valueOf("14:34:24"),
+        Timestamp.valueOf("1984-07-12 14:34:24")))
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+        "1984-07-12".cast(Types.DATE), "14:34:24".cast(Types.TIME),
+        "1984-07-12 14:34:24".cast(Types.TIMESTAMP))
+    val t2 = t.select("a, b, c, d, e, 11.2, 11.2," +
+      "'1984-07-12'.toDate, '14:34:24'.toTime," +
+      "'1984-07-12 14:34:24'.toTimestamp")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1.toString, lPlan2.toString)
+  }
+
+  @Test
+  def testIntegerBiggerThan128(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val t = env.fromElements((300, 1L, "Hello")).toTable(tableEnv, 'a, 'b, 'c)
+
+    val t1 = t.filter('a === 300)
+    val t2 = t.filter("a = 300")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
new file mode 100644
index 0000000..19d27fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.junit._
+
+class CastingStringExpressionTest {
+
+  @Test
+  def testNumericAutocastInArithmetic() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
+    val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+      '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+    val t2 = table.select("_1 + 1, _2 +" +
+      " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutocastInComparison() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
+      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
+      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
+    val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
+      'd > 1.0f && 'e > 1.0d && 'f > 1)
+    val t2 = table
+      .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
+    val t1 = table .select(
+      // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+      // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+      // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+      // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE), // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+    val t2 = table.select(
+      // * -> String
+      "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
+        // NUMERIC TYPE -> Boolean
+        "_1.cast(BOOL), _2.cast(BOOL), _3.cast(BOOL)," +
+        // NUMERIC TYPE -> NUMERIC TYPE
+        "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
+        // Boolean -> NUMERIC TYPE
+        "_4.cast(DOUBLE)," +
+        // identity casting
+        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOL)")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
+    val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+    val t2 = table.select(
+      "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
+        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOL)")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
new file mode 100644
index 0000000..025cda9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.apache.flink.table.expressions.Literal
+import org.junit._
+
+class JoinStringExpressionTest {
+
+  @Test
+  def testJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithJoinFilter(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+    val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithAggregation(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
+    val t1Java = ds1.join(ds2).where("a === d").select("g.count")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testJoinWithGroupedAggregation(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2)
+      .where('a === 'd)
+      .groupBy('a, 'd)
+      .select('b.sum, 'g.count)
+    val t2 = ds1.join(ds2)
+      .where("a = d")
+      .groupBy("a, d")
+      .select("b.sum, g.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testJoinPushThroughJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
+
+    val t1 = ds1.join(ds2)
+      .where(Literal(true))
+      .join(ds3)
+      .where('a === 'd && 'e === 'k)
+      .select('a, 'f, 'l)
+    val t2 = ds1.join(ds2)
+      .where("true")
+      .join(ds3)
+      .where("a === d && e === k")
+      .select("a, f, l")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithDisjunctivePred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithExpressionPreds(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testLeftJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testRightJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testRightJoinWithNotOnlyEquiJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+    val t2 = ds1.rightOuterJoin(ds2, "a = d && b < h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFullOuterJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
new file mode 100644
index 0000000..5cf3f84
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class AggregationsValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testNonWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.fromElements(("Hello", 1)).toTable(tEnv)
+      // Must fail. Field '_1 is not a numeric type.
+      .select('_1.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoNestedAggregations(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.fromElements(("Hello", 1)).toTable(tEnv)
+      // Must fail. Sum aggregation can not be chained.
+      .select('_2.sum.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. '_foo not a valid field
+      .groupBy('_foo)
+      .select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('a, 'b)
+      // must fail. 'c is not a grouping key or aggregation
+      .select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAggregationOnNonExistingField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+      // Must fail. Field 'foo does not exist.
+      .select('foo.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testAggregationOnNonExistingFieldJava() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv)
+    table.select("foo.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNonWorkingAggregationDataTypesJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    // Must fail. Cannot compute SUM aggregate on String field.
+    table.select("f1.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNoNestedAggregationsJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    // Must fail. Aggregation on aggregation not allowed.
+    table.select("f0.sum.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingOnNonExistentFieldJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    input
+      // must fail. Field foo is not in input
+      .groupBy("foo")
+      .select("a.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingInvalidSelectionJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    input
+      .groupBy("a, b")
+      // must fail. Field c is not a grouping key or aggregation
+      .select("c")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..846585b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+class CalcValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectInvalidFieldFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. Field 'foo does not exist
+      .select('a, 'foo)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'a
+      .select('a, 'b as 'a).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFilterInvalidFieldName(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    // must fail. Field 'foo does not exist
+    ds.filter( 'foo === 2 )
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testSelectInvalidField() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    ds.select("a + 1, foo + 2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testSelectAmbiguousFieldNames() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    ds.select("a + 1 as foo, b + 2 as foo")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testFilterInvalidField() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist.
+    table.filter("foo = 17")
+  }
+
+  @Test
+  def testAliasStarException(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+      fail("TableException expected")
+    } catch {
+      case _: TableException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..4837fcc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.junit._
+
+class JoinValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinNonExistingKey(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'foo does not exist
+      .where('foo === 'e)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'a is Int, and 'g is String
+      .where('a === 'g)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithAmbiguousFields(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
+
+    ds1.join(ds2)
+      // must fail. Both inputs share the same field 'c
+      .where('a === 'd)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate1(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('d === 'f)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate2(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('a < 'd)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.join(ds2).where('b === 'e).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoJoinCondition(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoEquiJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinNonExistingKeyJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'h)
+    // Must fail. Field foo does not exist.
+    in1.join(in2).where("foo === e").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinWithNonMatchingKeyTypesJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    in1.join(in2)
+      // Must fail. Types of join fields are not compatible (Integer and String)
+    .where("a === g").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinWithAmbiguousFieldsJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    // Must fail. Join input have overlapping field names.
+    in1.join(in2).where("a === d").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvsJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    // Must fail. Tables are bound to different TableEnvironments.
+    in1.join(in2).where("a === d").select("g.count")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
new file mode 100644
index 0000000..baac9ea
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SetOperatorsValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentColumnSize(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+    // must fail. Union inputs have different column size.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Union inputs have different field types.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Minus inputs have different field types.
+    ds1.minus(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAllTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.minusAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectWithDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Intersect inputs have different field types.
+    ds1.intersect(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.intersect(ds2).select('c)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
new file mode 100644
index 0000000..4188f51
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SortValidationTest {
+
+  def getExecutionEnvironment = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(4)
+    env
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchWithoutOrder(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).limit(0, 5)
+
+    t.toDataSet[Row].collect()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
new file mode 100644
index 0000000..435d6b9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+object LogicalPlanFormatUtils {
+  private val tempPattern = """TMP_\d+""".r
+
+  def formatTempTableId(preStr: String): String = {
+    val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
+    val minId = getMinTempTableId(str)
+    tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
+  }
+
+  private def getMinTempTableId(logicalStr: String): Long = {
+    tempPattern.findAllIn(logicalStr).map(s => {
+      s.substring(4).toInt
+    }).min
+  }
+}


Mime
View raw message