flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [40/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite
Date Fri, 12 Feb 2016 11:30:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index da82b8a..a3d31da 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.table.test;
 
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -41,7 +41,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testSimpleSelectAllWithAs() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -53,19 +53,19 @@ public class SelectITCase extends MultipleProgramsTestBase {
 		Table result = in
 				.select("a, b, c");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-//				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-//				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
 
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testSimpleSelectWithNaming() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -78,12 +78,12 @@ public class SelectITCase extends MultipleProgramsTestBase {
 				.select("f0 as a, f1 as b")
 				.select("a, b");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-//				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-//				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -95,10 +95,10 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
 		Table in = tableEnv.fromDataSet(ds, "a, b");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = " sorry dude ";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = " sorry dude ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -110,10 +110,10 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
 		Table in = tableEnv.fromDataSet(ds, "a, b, c, d");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = " sorry dude ";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = " sorry dude ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -125,10 +125,10 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
 		Table in = tableEnv.fromDataSet(ds, "a, b, c, b");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = " today's not your day ";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = " today's not your day ";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -140,9 +140,9 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
 		Table in = tableEnv.fromDataSet(ds, "a, b as c, d");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "sorry bro";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(in, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "sorry bro";
+		compareResultAsText(results, expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 12e7203..c4b2f01 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.table.test;
 
+import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,6 +29,9 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
+
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class StringExpressionsITCase extends MultipleProgramsTestBase {
@@ -37,7 +41,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testSubstring() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -51,13 +55,13 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table result = in
 				.select("a.substring(0, b)");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "AA\nB";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "AA\nB";
+		compareResultAsText(results, expected);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testSubstringWithMaxEnd() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -71,10 +75,10 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table result = in
 				.select("a.substring(b)");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "CD\nBCD";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "CD\nBCD";
+		compareResultAsText(results, expected);
 	}
 
 	// Calcite does eagerly check expression types
@@ -93,10 +97,10 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table result = in
 				.select("a.substring(0, b)");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	// Calcite does eagerly check expression types
@@ -115,9 +119,9 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		Table result = in
 				.select("a.substring(b, 15)");
 
-//		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-//		List<Row> results = resultSet.collect();
-//		String expected = "";
-//		compareResultAsText(results, expected);
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
index de02ee1..d6297d9 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -42,7 +43,7 @@ public class UnionITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testUnion() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -55,13 +56,13 @@ public class UnionITCase extends MultipleProgramsTestBase {
 
 		Table selected = in1.unionAll(in2).select("c");
 
-//		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-//		List<Row> results = ds.collect();
-//		String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n";
-//		compareResultAsText(results, expected);
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n";
+		compareResultAsText(results, expected);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testUnionWithFilter() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -74,10 +75,10 @@ public class UnionITCase extends MultipleProgramsTestBase {
 
 		Table selected = in1.unionAll(in2).where("b < 2").select("c");
 
-//		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-//		List<Row> results = ds.collect();
-//		String expected = "Hi\n" + "Hallo\n";
-//		compareResultAsText(results, expected);
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi\n" + "Hallo\n";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -93,10 +94,10 @@ public class UnionITCase extends MultipleProgramsTestBase {
 
 		Table selected = in1.unionAll(in2);
 
-//		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-//		List<Row> results = ds.collect();
-//		String expected = "";
-//		compareResultAsText(results, expected);
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -112,13 +113,13 @@ public class UnionITCase extends MultipleProgramsTestBase {
 
 		Table selected = in1.unionAll(in2);
 
-//		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-//		List<Row> results = ds.collect();
-//		String expected = "";
-//		compareResultAsText(results, expected);
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
 	}
 
-	@Test
+	@Test(expected = NotImplementedError.class)
 	public void testUnionWithAggregation() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();
@@ -131,10 +132,10 @@ public class UnionITCase extends MultipleProgramsTestBase {
 
 		Table selected = in1.unionAll(in2).select("c.count");
 
-//		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-//		List<Row> results = ds.collect();
-//		String expected = "18";
-//		compareResultAsText(results, expected);
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "18";
+		compareResultAsText(results, expected);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 75a113d..76bdcba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -33,16 +33,16 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
 
-//    val results = t.toDataSet[Row].collect()
-//    val expected = "231,1,21,21,11"
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = t.toDataSet[Row].collect()
+    val expected = "231,1,21,21,11"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -52,12 +52,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val t = CollectionDataSets.get3TupleDataSet(env).toTable
       .select('foo.avg)
 
-//    val expected = ""
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -66,33 +66,33 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable
       .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
 
-//    val expected = "1,1,1,1,1.5,1.5,2"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1,1,1,1.5,1.5,2"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
       .select(('_1 + 2).avg + 2, '_2.count + 5)
 
-//    val expected = "5.5,7"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "5.5,7"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testAggregationWithTwoCount(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
       .select('_1.count, '_2.count)
 
-//    val expected = "2,2"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "2,2"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Ignore // Calcite does not eagerly check types
@@ -103,9 +103,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val t = env.fromElements(("Hello", 1)).toTable
       .select('_1.sum)
 
-//    val expected = ""
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -115,12 +115,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val t = env.fromElements(("Hello", 1)).toTable
       .select('_2.sum.sum)
 
-//    val expected = ""
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSQLStyleAggregations(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
@@ -136,10 +136,9 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
           |Count(a) as e1, a.count as e2
         """.stripMargin)
 
-//    val expected = "231,231,1,1,21,21,11,11,21,21"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "231,231,1,1,21,21,11,11,21,21"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index a32774f..5ff2b82 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.table.Row
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -33,20 +33,20 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
 
-//    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-//      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-//      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -55,9 +55,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -66,9 +66,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -77,9 +77,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -89,9 +89,9 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     // as can only have field references
     val t = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -101,8 +101,8 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
     // as can only have field references
     val t = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 8199f6b..4a37737 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -44,12 +44,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable
       .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date")
 
-//    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testNumericAutoCastInArithmetic(): Unit = {
 
     // don't test everything, just some common cast directions
@@ -58,12 +58,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
       .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
 
-//    val expected = "2,2,2,2.0,2.0,2.0"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "2,2,2,2.0,2.0,2.0"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testNumericAutoCastInComparison(): Unit = {
 
     // don't test everything, just some common cast directions
@@ -74,12 +74,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
       .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
 
-//    val expected = "2,2,2,2,2.0,2.0"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "2,2,2,2,2.0,2.0"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testCastFromString: Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -99,14 +99,14 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
         '_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
         '_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
 
-//    val expected = "1,1,1,1,2.0,2.0,true," +
-//      "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
-//      "1970-01-17 17:47:53.775\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1,1,1,2.0,2.0,true," +
+      "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
+      "1970-01-17 17:47:53.775\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testCastDateToStringAndLong {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
@@ -118,9 +118,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
         'f1.cast(BasicTypeInfo.STRING_TYPE_INFO),
         'f1.cast(BasicTypeInfo.LONG_TYPE_INFO))
 
-//    val expected = "2011-05-03 15:51:36.000,1304437896000," +
-//      "2011-05-03 15:51:36.000,1304437896000\n"
-//    val result = t.toDataSet[Row].collect
-//    TestBaseUtils.compareResultAsText(result.asJava, expected)
+    val expected = "2011-05-03 15:51:36.000,1304437896000," +
+      "2011-05-03 15:51:36.000,1304437896000\n"
+    val result = t.toDataSet[Row].collect
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index a25f3e3..9f20043 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -22,7 +22,7 @@ import java.util.Date
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.table.Row
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
@@ -36,43 +36,43 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements((5, 10)).as('a, 'b)
       .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
 
-//    val expected = "0,10,2,10,1,-5"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "0,10,2,10,1,-5"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testLogic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements((5, true)).as('a, 'b)
       .select('b && true, 'b && false, 'b || false, !'b)
 
-//    val expected = "true,false,true,false"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "true,false,true,false"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testComparisons(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
       .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
 
-//    val expected = "true,true,false,false,true"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "true,true,false,false,true"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testCaseInsensitiveForAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -80,9 +80,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val t = env.fromElements((3, 5.toByte)).as('a, 'b)
       .groupBy("a").select("a, a.count As cnt")
 
-//    val expected = "3,1"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "3,1"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // Date literals not yet supported
@@ -96,9 +96,9 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
         Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
         'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
 
-//    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index 853cae6..50ce150 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
+import org.apache.flink.api.table.Row
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -40,12 +41,12 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
       .groupBy('_foo)
       .select('a.avg)
 
-//    val expected = ""
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testGroupedAggregate(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
@@ -56,12 +57,12 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
       .groupBy('b)
       .select('b, 'a.sum)
 
-//    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testGroupingKeyForwardIfNotUsed(): Unit = {
 
     // the grouping key needs to be forwarded to the intermediate DataSet, even
@@ -72,12 +73,12 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
       .groupBy('b)
       .select('a.sum)
 
-//    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testGroupNoAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -88,9 +89,9 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram
       .groupBy('b, 'd)
       .select('b)
 
-//    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index a98b7c8..628613e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{Row, ExpressionException}
+import org.apache.flink.api.table.Row
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -41,12 +41,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
 
-//    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testJoinWithFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -54,12 +54,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
 
-//    val expected = "Hi,Hallo\n"
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "Hi,Hallo\n"
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -67,10 +67,10 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
-//    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-//      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -81,9 +81,9 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('foo === 'e).select('c, 'g)
 
-//    val expected = ""
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // Calcite does not eagerly check the compatibility of compared types
@@ -96,9 +96,9 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('a === 'g).select('c, 'g)
 
-//    val expected = ""
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -109,12 +109,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('a === 'd).select('c, 'g)
 
-//    val expected = ""
-//    val results = joinT.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = ""
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -122,9 +122,9 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
 
-//    val expected = "6"
-//    val results = joinT.toDataSet[Row]collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "6"
+    val results = joinT.toDataSet[Row]collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 5791d2e..4dadfe4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -33,39 +33,39 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSimpleSelectAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
 
-//    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-//      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-//      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSimpleSelectAllWithAs(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
 
-//    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-//      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-//      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-//      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-//      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-//      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSimpleSelectWithNaming(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -73,11 +73,11 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
       .select('_1 as 'a, '_2 as 'b)
       .select('a, 'b)
 
-//    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-//      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-//      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -86,9 +86,9 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -97,9 +97,9 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -108,9 +108,9 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 
@@ -120,8 +120,8 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
 
-//    val expected = "no"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "no"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index e3a3170..565f444 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -32,26 +32,26 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
       .select('a.substring(0, 'b))
 
-//    val expected = "AA\nB"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "AA\nB"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
       .select('a.substring('b))
 
-//    val expected = "CD\nBCD"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "CD\nBCD"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // Calcite does eagerly check expression types
@@ -63,9 +63,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
     val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
       .select('a.substring(0, 'b))
 
-//    val expected = "AAA\nBB"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "AAA\nBB"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   // Calcite does eagerly check expression types
@@ -77,9 +77,9 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
     val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
       .select('a.substring('b, 15))
 
-//    val expected = "AAA\nBB"
-//    val results = t.toDataSet[Row].collect()
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val expected = "AAA\nBB"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
index d3c6127..3d03f23 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/UnionITCase.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testUnion(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -41,12 +41,12 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
     val unionDs = ds1.unionAll(ds2).select('c)
 
-//    val results = unionDs.toDataSet[Row].collect()
-//    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testUnionWithFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -54,9 +54,9 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
     val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
 
-//    val results = joinDs.toDataSet[Row].collect()
-//    val expected = "Hi\n" + "Hallo\n"
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = joinDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hallo\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -67,9 +67,9 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
     val unionDs = ds1.unionAll(ds2)
 
-//    val results = unionDs.toDataSet[Row].collect()
-//    val expected = ""
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = ""
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -80,12 +80,12 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
     val unionDs = ds1.unionAll(ds2)
 
-//    val results = unionDs.toDataSet[Row].collect()
-//    val expected = ""
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = ""
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[NotImplementedError])
   def testUnionWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
@@ -93,8 +93,8 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
 
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
 
-//    val results = unionDs.toDataSet[Row].collect()
-//    val expected = "18"
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "18"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }


Mime
View raw message