flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [46/50] [abbrv] flink git commit: [tableAPI] Cleaned up tests
Date Fri, 18 Mar 2016 13:48:40 GMT
[tableAPI] Cleaned up tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29fb3d27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29fb3d27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29fb3d27

Branch: refs/heads/master
Commit: 29fb3d27ca605fe9019034a220ae8b3ae5dac7a3
Parents: b9f1ff0
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Mar 17 13:50:32 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:52 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/table/table.scala      | 26 +++++-
 .../api/java/table/test/AggregationsITCase.java |  3 +-
 .../flink/api/java/table/test/AsITCase.java     |  9 ++
 .../api/java/table/test/CastingITCase.java      |  7 +-
 .../flink/api/java/table/test/FilterITCase.java | 40 ++++----
 .../table/test/GroupedAggregationsITCase.java   | 16 +---
 .../flink/api/java/table/test/JoinITCase.java   | 27 ++----
 .../flink/api/java/table/test/SelectITCase.java | 56 ++---------
 .../api/java/table/test/SqlExplainITCase.java   | 98 +++++---------------
 .../table/test/StringExpressionsITCase.java     |  6 +-
 .../flink/api/java/table/test/UnionITCase.java  | 35 ++++---
 .../scala/table/test/AggregationsITCase.scala   |  8 +-
 .../flink/api/scala/table/test/AsITCase.scala   | 20 ++--
 .../api/scala/table/test/CastingITCase.scala    |  6 +-
 .../api/scala/table/test/FilterITCase.scala     | 38 +++-----
 .../table/test/GroupedAggregationsITCase.scala  | 13 ---
 .../flink/api/scala/table/test/JoinITCase.scala | 30 +++---
 .../api/scala/table/test/SelectITCase.scala     | 42 +++------
 .../api/scala/table/test/SqlExplainTest.scala   | 46 +++++----
 .../table/test/StringExpressionsITCase.scala    |  6 +-
 .../api/scala/table/test/UnionITCase.scala      | 18 ++--
 .../test/utils/TableProgramsTestBase.scala      |  1 -
 22 files changed, 215 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 709af62..0e480e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -36,6 +36,7 @@ import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, Unresol
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 case class BaseTable(
@@ -84,6 +85,8 @@ class Table(
     */
   def select(fields: Expression*): Table = {
 
+    checkUniqueNames(fields)
+
     relBuilder.push(relNode)
 
     // separate aggregations and selection expressions
@@ -394,10 +397,31 @@ class Table(
       }
 
     }
-
     LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
   }
 
+  private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
+    val names: mutable.Set[String] = mutable.Set()
+
+    exprs.foreach {
+      case n: Naming =>
+        // explicit name
+        if (names.contains(n.name)) {
+          throw new IllegalArgumentException(s"Duplicate field name $n.name.")
+        } else {
+          names.add(n.name)
+        }
+      case u: UnresolvedFieldReference =>
+        // simple field forwarding
+        if (names.contains(u.name)) {
+          throw new IllegalArgumentException(s"Duplicate field name $u.name.")
+        } else {
+          names.add(u.name)
+        }
+      case _ => // Do nothing
+    }
+  }
+
 }
 
 class GroupedTable(

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 9797950..597daea 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -132,7 +132,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		Table result =
 				table.select("(f0 + 2).avg + 2, f1.count + 5");
 
-
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = ds.collect();
 		String expected = "5.5,7";
@@ -172,6 +171,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(input);
 
 		Table result =
+				// Must fail. Cannot compute SUM aggregate on String field.
 				table.select("f1.sum");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
@@ -191,6 +191,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 				tableEnv.fromDataSet(input);
 
 		Table result =
+				// Must fail. Aggregation on aggregation not allowed.
 				table.select("f0.sum.sum");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index 097339e..faa6b0c 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -224,6 +224,7 @@ public class AsITCase extends TableProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
+		// Must fail. Not enough field names specified.
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
 	}
 
@@ -232,6 +233,7 @@ public class AsITCase extends TableProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
+		// Must fail. Too many field names specified.
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
 	}
 
@@ -240,6 +242,7 @@ public class AsITCase extends TableProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
+		// Must fail. Specified field names are not unique.
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
 	}
 
@@ -248,6 +251,7 @@ public class AsITCase extends TableProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
+		// Must fail. as() does only allow field name expressions
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
 	}
 
@@ -256,11 +260,13 @@ public class AsITCase extends TableProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
+		// Must fail. as() does only allow field name expressions
 		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
 	}
 
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("unused")
 	public static class SmallPojo {
 
 		public SmallPojo() { }
@@ -278,6 +284,7 @@ public class AsITCase extends TableProgramsTestBase {
 		public String department;
 	}
 
+	@SuppressWarnings("unused")
 	public static class PrivateSmallPojo {
 
 		public PrivateSmallPojo() { }
@@ -327,6 +334,7 @@ public class AsITCase extends TableProgramsTestBase {
 		}
 	}
 
+	@SuppressWarnings("unused")
 	public static class SmallPojo2 {
 
 		public SmallPojo2() { }
@@ -349,6 +357,7 @@ public class AsITCase extends TableProgramsTestBase {
 		}
 	}
 
+	@SuppressWarnings("unused")
 	public static class PrivateSmallPojo2 {
 
 		public PrivateSmallPojo2() { }

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index 5b22574..5a4adbc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -138,7 +139,8 @@ public class CastingITCase extends TableProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
-	@Test(expected = CodeGenException.class)
+	@Ignore // Date type not supported yet
+	@Test
 	public void testCastDateFromString() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -160,7 +162,8 @@ public class CastingITCase extends TableProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
-	@Test(expected = CodeGenException.class)
+	@Ignore // Date type not supported yet
+	@Test
 	public void testCastDateToStringAndLong() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index c783524..45bd7e4 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
@@ -45,9 +46,7 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.filter("false");
@@ -64,9 +63,7 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.filter("true");
@@ -89,9 +86,7 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.filter(" a % 2 = 0 ");
@@ -110,9 +105,7 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.filter("!( a % 2 <> 0 ) ");
@@ -131,9 +124,10 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
 		Table table = tableEnv.fromDataSet(input, "a, b, c");
-		Table result = table.filter("a < 2 || a > 20");
+
+		Table result = table
+			.filter("a < 2 || a > 20");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = ds.collect();
@@ -147,15 +141,29 @@ public class FilterITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-
 		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
-		Table result = table.filter("a = 300 ");
+		Table result = table
+			.filter("a = 300 ");
 
 		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = ds.collect();
 		String expected = "300,1,Hello\n";
 		compareResultAsText(results, expected);
 	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testFilterInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		table
+			// Must fail. Field foo does not exist.
+			.filter("foo = 17");
+	}
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
index 91d1976..fe85678 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -73,9 +73,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.groupBy("b").select("b, a.sum");
@@ -88,17 +86,11 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testGroupingKeyForwardIfNotUsed() throws Exception {
-
-		// the grouping key needs to be forwarded to the intermediate DataSet, even
-		// if we don't want the key in the output
-
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 				.groupBy("b").select("a.sum");
@@ -116,9 +108,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
 		TableEnvironment tableEnv = new TableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table =
-			tableEnv.fromDataSet(input, "a, b, c");
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
 
 		Table result = table
 			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
index 7b8b6ec..d20c890 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
@@ -131,12 +131,8 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
 		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
 
-		Table result = in1.join(in2).where("foo === e").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
+		// Must fail. Field foo does not exist.
+		in1.join(in2).where("foo === e").select("c, g");
 	}
 
 	@Test(expected = InvalidProgramException.class)
@@ -150,13 +146,11 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
 		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
 
-		Table result = in1
-				.join(in2).where("a === g").select("c, g");
+		Table result = in1.join(in2)
+			// Must fail. Types of join fields are not compatible (Integer and String)
+			.where("a === g").select("c, g");
 
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
+		tableEnv.toDataSet(result, Row.class).collect();
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -170,13 +164,8 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
 		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
 
-		Table result = in1
-				.join(in2).where("a === d").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
+		// Must fail. Join input have overlapping field names.
+		in1.join(in2).where("a === d").select("c, g");
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index 82a2e4a..24cae60 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -45,7 +45,6 @@ public class SelectITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
 		Table in = tableEnv.fromDataSet(ds, "a,b,c");
 
 		Table result = in
@@ -61,7 +60,6 @@ public class SelectITCase extends TableProgramsTestBase {
 			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
 			"20,6,Comment#14\n" + "21,6,Comment#15\n";
 		compareResultAsText(results, expected);
-
 	}
 
 	@Test
@@ -70,7 +68,6 @@ public class SelectITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
 		Table in = tableEnv.fromDataSet(ds);
 
 		Table result = in
@@ -91,7 +88,6 @@ public class SelectITCase extends TableProgramsTestBase {
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
 		Table in = tableEnv.fromDataSet(ds);
 
 		Table result = in
@@ -107,62 +103,26 @@ public class SelectITCase extends TableProgramsTestBase {
 	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithToFewFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = getJavaTableEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = " sorry dude ";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithToManyFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		TableEnvironment tableEnv = getJavaTableEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = " sorry dude ";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithAmbiguousFields() throws Exception {
+	public void testSelectInvalidField() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 
-		Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = " today's not your day ";
-		compareResultAsText(results, expected);
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1, foo + 2");
 	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testOnlyFieldRefInAs() throws Exception {
+	public void testSelectAmbiguousFieldNames() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = getJavaTableEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 
-		Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "sorry bro";
-		compareResultAsText(results, expected);
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1 as foo, b + 2 as foo");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
index 9e09664..c985a37 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.table.test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.plan.TranslationContext;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -40,19 +41,6 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 
 	private static String testFilePath = SqlExplainITCase.class.getResource("/").getFile();
 
-	public static class WC {
-		public String word;
-		public int count;
-
-		// Public constructor to make it a Flink POJO
-		public WC() {}
-
-		public WC(int count, String word) {
-			this.word = word;
-			this.count = count;
-		}
-	}
-
 	@Before
 	public void resetContext() {
 		TranslationContext.reset();
@@ -63,12 +51,8 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input = env.fromElements(
-				new WC(1,"d"),
-				new WC(2,"d"),
-				new WC(3,"d"));
-
-		Table table = tableEnv.fromDataSet(input, "count as a, word as b");
+		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
+		Table table = tableEnv.fromDataSet(input, "a, b");
 
 		String result = table
 				.filter("a % 2 = 0")
@@ -84,12 +68,8 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input = env.fromElements(
-				new WC(1, "d"),
-				new WC(2, "d"),
-				new WC(3, "d"));
-
-		Table table = tableEnv.fromDataSet(input, "count as a, word as b");
+		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
+		Table table = tableEnv.fromDataSet(input, "a, b");
 
 		String result = table
 				.filter("a % 2 = 0")
@@ -105,19 +85,10 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input1 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table1 = tableEnv.fromDataSet(input1, "count as a, word as b");
-
-		DataSet<WC> input2 = env.fromElements(
-				new WC(1,"d"),
-				new WC(1,"d"),
-				new WC(1,"d"));
-
-		Table table2 = tableEnv.fromDataSet(input2, "count as c, word as d");
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "a, b");
+		Table table2 = tableEnv.fromDataSet(input2, "c, d");
 
 		String result = table1
 				.join(table2)
@@ -135,19 +106,10 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input1 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table1 = tableEnv.fromDataSet(input1, "count as a, word as b");
-
-		DataSet<WC> input2 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table2 = tableEnv.fromDataSet(input2, "count as c, word as d");
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "a, b");
+		Table table2 = tableEnv.fromDataSet(input2, "c, d");
 
 		String result = table1
 				.join(table2)
@@ -165,19 +127,10 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input1 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table1 = tableEnv.fromDataSet(input1);
-
-		DataSet<WC> input2 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table2 = tableEnv.fromDataSet(input2);
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "count, word");
+		Table table2 = tableEnv.fromDataSet(input2, "count, word");
 
 		String result = table1
 				.unionAll(table2)
@@ -193,19 +146,10 @@ public class SqlExplainITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
-		DataSet<WC> input1 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table1 = tableEnv.fromDataSet(input1);
-
-		DataSet<WC> input2 = env.fromElements(
-				new WC(1, "d"),
-				new WC(1, "d"),
-				new WC(1, "d"));
-
-		Table table2 = tableEnv.fromDataSet(input2);
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "count, word");
+		Table table2 = tableEnv.fromDataSet(input2, "count, word");
 
 		String result = table1
 				.unionAll(table2)

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 707ee66..d95a5f6 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -92,7 +92,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
 		Table result = in
-				.select("a.substring(0, b)");
+			// Must fail. Second parameter of substring must be an Integer not a Double.
+			.select("a.substring(0, b)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		resultSet.collect();
@@ -110,7 +111,8 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
 		Table result = in
-				.select("a.substring(b, 15)");
+			// Must fail. First parameter of substring must be an Integer not a String.
+			.select("a.substring(b, 15)");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		resultSet.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index 75429c2..167e45c 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -79,7 +79,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testUnionFieldsNameNotOverlap1() throws Exception {
+	public void testUnionIncompatibleNumberOfFields() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
@@ -89,16 +89,27 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
 		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
 
-		Table selected = in1.unionAll(in2);
+		// Must fail. Number of fields of union inputs do not match
+		in1.unionAll(in2);
+	}
 
-		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnionIncompatibleFieldsName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d");
+
+		// Must fail. Field names of union inputs do not match
+		in1.unionAll(in2);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testUnionFieldsNameNotOverlap2() throws Exception {
+	public void testUnionIncompatibleFieldTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
 
@@ -108,12 +119,8 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
 		Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c");
 
-		Table selected = in1.unionAll(in2);
-
-		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
+		// Must fail. Field types of union inputs do not match
+		in1.unionAll(in2);
 	}
 
 	@Test
@@ -158,4 +165,4 @@ public class UnionITCase extends MultipleProgramsTestBase {
 	      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
 	    compareResultAsText(results, expected);
 	  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 81b22ba..0741db8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -51,6 +51,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable
+      // Must fail. Field 'foo does not exist.
       .select('foo.avg)
   }
 
@@ -109,7 +110,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   @Test
   def testAggregationAfterProjection(): Unit = {
 
-    // verify AggregateProjectMergeRule.
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -127,6 +127,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("Hello", 1)).toTable
+      // Must fail. Field '_1 is not a numeric type.
       .select('_1.sum)
 
     t.collect()
@@ -137,15 +138,13 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("Hello", 1)).toTable
+      // Must fail. Sum aggregation can not be chained.
       .select('_2.sum.sum)
   }
 
   @Test
   def testSQLStyleAggregations(): Unit = {
 
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .select(
@@ -164,7 +163,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   @Test
   def testPojoAggregation(): Unit = {
 
-    // test aggregations with a custom WordCount class
     val env = ExecutionEnvironment.getExecutionEnvironment
     val input = env.fromElements(
       MyWC("hello", 1),

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 9f9a3b4..c2e4e96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -103,7 +103,8 @@ class AsITCase(
   def testAsWithToFewFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env)
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
       .as('a, 'b)
   }
 
@@ -111,7 +112,8 @@ class AsITCase(
   def testAsWithToManyFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env)
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
       .as('a, 'b, 'c, 'd)
   }
 
@@ -119,7 +121,8 @@ class AsITCase(
   def testAsWithAmbiguousFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env)
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Field names not unique.
       .as('a, 'b, 'b)
   }
 
@@ -127,9 +130,8 @@ class AsITCase(
   def testAsWithNonFieldReference1(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
       .as('a + 1, 'b, 'c)
   }
 
@@ -137,15 +139,13 @@ class AsITCase(
   def testAsWithNonFieldReference2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
       .as('a as 'foo, 'b, 'c)
   }
 
 }
 
 case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-
   def this() { this("", 0, 0.0, "") }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 111525f..5ca8c7f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -138,7 +138,8 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Ignore // Date types not supported yet
+  @Test
   def testCastDateFromString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -156,7 +157,8 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Ignore // Date types not supported yet
+  @Test
   def testCastDateToStringAndLong(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index ae8ebef..3582c33 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.codegen.CodeGenException
 import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import TableProgramsTestBase.TableConfigMode
@@ -42,9 +43,6 @@ class FilterITCase(
 
   @Test
   def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter.
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -57,9 +55,6 @@ class FilterITCase(
 
   @Test
   def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter.
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -76,9 +71,6 @@ class FilterITCase(
 
   @Test
   def testFilterOnStringTupleField(): Unit = {
-    /*
-     * Test filter on String tuple field.
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
     val filterDs = ds.filter( 'c.like("%world%") )
@@ -90,9 +82,6 @@ class FilterITCase(
 
   @Test
   def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -108,9 +97,6 @@ class FilterITCase(
 
   @Test
   def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -124,8 +110,7 @@ class FilterITCase(
   }
 
   @Test
-  def testDisjunctivePreds(): Unit = {
-
+  def testDisjunctivePredicate(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -136,9 +121,7 @@ class FilterITCase(
   }
 
   @Test
-  def testFilterMerge(): Unit = {
-    // verify FilterMergeRule.
-
+  def testConsecutiveFilters(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
@@ -152,9 +135,6 @@ class FilterITCase(
 
   @Test
   def testFilterBasicType(): Unit = {
-    /*
-     * Test filter on basic type
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getStringDataSet(env)
 
@@ -167,9 +147,6 @@ class FilterITCase(
 
   @Test
   def testFilterOnCustomType(): Unit = {
-    /*
-     * Test filter on custom type
-     */
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getCustomTypeDataSet(env)
     val filterDs = ds.as('myInt as 'i, 'myLong as 'l, 'myString as 's)
@@ -180,4 +157,13 @@ class FilterITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def testFilterInvalidFieldName(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    // must fail. Field 'foo does not exist
+    ds.filter( 'foo === 2 )
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index cbed9e3..01aa00f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -57,9 +57,6 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   @Test
   def testGroupedAggregate(): Unit = {
 
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
@@ -73,9 +70,6 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   @Test
   def testGroupingKeyForwardIfNotUsed(): Unit = {
 
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)
@@ -129,8 +123,6 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   @Test
   def testGroupedAggregateWithConstant1(): Unit = {
 
-    // verify AggregateProjectPullUpConstantsRule
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .select('a, 4 as 'four, 'b)
@@ -148,8 +140,6 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   @Test
   def testGroupedAggregateWithConstant2(): Unit = {
 
-    // verify AggregateProjectPullUpConstantsRule
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
         .select('b, 4 as 'four, 'a)
@@ -178,9 +168,6 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
   @Test
   def testGroupedAggregateWithFilter(): Unit = {
 
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
       .groupBy('b)

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 70beaf0..4583bd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -50,7 +50,6 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithFilter(): Unit = {
 
-    // verify FilterJoinRule.FILTER_ON_JOIN
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -96,11 +95,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.join(ds2).where('foo === 'e).select('c, 'g)
-
-    val expected = ""
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    ds1.join(ds2)
+      // must fail. Field 'foo does not exist
+      .where('foo === 'e)
+      .select('c, 'g)
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -109,11 +107,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.join(ds2).where('a === 'g).select('c, 'g)
-
-    val expected = ""
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    ds1.join(ds2)
+      // must fail. Field 'a is Int, and 'g is String
+      .where('a === 'g)
+      .select('c, 'g).collect()
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -122,11 +119,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
 
-    val joinT = ds1.join(ds2).where('a === 'd).select('c, 'g)
-
-    val expected = ""
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    ds1.join(ds2)
+      // must fail. Both inputs share the same field 'c
+      .where('a === 'd)
+      .select('c, 'g)
   }
 
   @Test
@@ -145,7 +141,6 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithGroupedAggregation(): Unit = {
 
-    // verify AggregateJoinTransposeRule
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -163,7 +158,6 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinPushThroughJoin(): Unit = {
 
-    // verify JoinPushThroughJoinRule
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 111aaeb..2065fd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -74,7 +74,6 @@ class SelectITCase(
   @Test
   def testSimpleSelectWithNaming(): Unit = {
 
-    // verify ProjectMergeRule.
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
@@ -103,47 +102,30 @@ class SelectITCase(
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToFewFields(): Unit = {
+  def testSelectInvalidFieldFields(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      // must fail. Field 'foo does not exist
+      .select('a, 'foo)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithToManyFields(): Unit = {
+  def testSelectAmbiguousRenaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a + 1 as 'foo, 'b + 2 as 'foo).print()
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testAsWithAmbiguousFields(): Unit = {
+  def testSelectAmbiguousRenaming2(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a, 'b as 'a).print()
   }
 
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
-
-    val expected = "no"
-    val results = t.toDataSet[Row](getConfig).collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
index de07b24..861a801 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SqlExplainTest.scala
@@ -26,8 +26,6 @@ import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.junit._
 import org.junit.Assert.assertEquals
 
-case class WC(count: Int, word: String)
-
 class SqlExplainTest
   extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
 
@@ -41,9 +39,9 @@ class SqlExplainTest
   @Test
   def testFilterWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao"))
-      .as('count as 'a, 'word as 'b)
-    val result = expr.filter("a % 2 = 0").explain()
+    val table = env.fromElements((1, "hello")).as('a, 'b)
+
+    val result = table.filter("a % 2 = 0").explain()
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter0.out").mkString
     assertEquals(result, source)
@@ -52,9 +50,9 @@ class SqlExplainTest
   @Test
   def testFilterWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr = env.fromElements(WC(1, "hello"), WC(2, "hello"), WC(3, "ciao"))
-      .as('count as 'a, 'word as 'b)
-    val result = expr.filter("a % 2 = 0").explain(true)
+    val table = env.fromElements((1, "hello")).as('a, 'b)
+
+    val result = table.filter("a % 2 = 0").explain(true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testFilter1.out").mkString
     assertEquals(result, source)
@@ -63,11 +61,10 @@ class SqlExplainTest
   @Test
   def testJoinWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao"))
-      .as('count as 'a, 'word as 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java"))
-      .as('count as 'c, 'word as 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain()
+    val table1 = env.fromElements((1, "hello")).as('a, 'b)
+    val table2 = env.fromElements((1, "hello")).as('c, 'd)
+
+    val result = table1.join(table2).where("b = d").select("a, c").explain()
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin0.out").mkString
     assertEquals(result, source)
@@ -76,11 +73,10 @@ class SqlExplainTest
   @Test
   def testJoinWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao"))
-      .as('count as 'a, 'word as 'b)
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java"))
-      .as('count as 'c, 'word as 'd)
-    val result = expr1.join(expr2).where("b = d").select("a, c").explain(true)
+    val table1 = env.fromElements((1, "hello")).as('a, 'b)
+    val table2 = env.fromElements((1, "hello")).as('c, 'd)
+
+    val result = table1.join(table2).where("b = d").select("a, c").explain(true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin1.out").mkString
     assertEquals(result, source)
@@ -89,9 +85,10 @@ class SqlExplainTest
   @Test
   def testUnionWithoutExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable
-    val result = expr1.unionAll(expr2).explain()
+    val table1 = env.fromElements((1, "hello")).as('count, 'word)
+    val table2 = env.fromElements((1, "hello")).as('count, 'word)
+
+    val result = table1.unionAll(table2).explain()
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion0.out").mkString
     assertEquals(result, source)
@@ -100,9 +97,10 @@ class SqlExplainTest
   @Test
   def testUnionWithExtended() : Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val expr1 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "ciao")).toTable
-    val expr2 = env.fromElements(WC(1, "hello"), WC(1, "hello"), WC(1, "java")).toTable
-    val result = expr1.unionAll(expr2).explain(true)
+    val table1 = env.fromElements((1, "hello")).as('count, 'word)
+    val table2 = env.fromElements((1, "hello")).as('count, 'word)
+
+    val result = table1.unionAll(table2).explain(true)
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testUnion1.out").mkString
     assertEquals(result, source)

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 7977547..310d133 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -60,9 +60,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+      // must fail, second argument of substring must be Integer not Double.
       .select('a.substring(0, 'b))
 
-    val results = t.toDataSet[Row].collect()
+    t.toDataSet[Row].collect()
   }
 
   @Test(expected = classOf[CodeGenException])
@@ -70,9 +71,10 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+      // must fail, first argument of substring must be Integer not String.
       .select('a.substring('b, 15))
 
-    val results = t.toDataSet[Row].collect()
+    t.toDataSet[Row].collect()
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
index 1263eba..a155935 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
@@ -80,29 +80,23 @@ class UnionITCase(
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testUnionFieldsNameNotOverlap1(): Unit = {
+  def testUnionDifferentFieldNames(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'd, 'c, 'e)
 
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = ""
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    // must fail. Union inputs have different field names.
+    ds1.unionAll(ds2)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testUnionFieldsNameNotOverlap2(): Unit = {
+  def testUnionDifferentFieldTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e).select('a, 'b, 'c)
 
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = ""
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    // must fail. Union inputs have different field types.
+    ds1.unionAll(ds2)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/29fb3d27/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
index 78be519..a971136 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
@@ -67,7 +67,6 @@ class TableProgramsTestBase(
       case _ => // keep default
     }
   }
-
 }
 
 object TableProgramsTestBase {


Mime
View raw message