flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [14/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.
Date Fri, 16 Dec 2016 15:46:43 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
deleted file mode 100644
index 53a1a7d..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.stream.sql;
-
-import org.apache.flink.api.java.table.StreamTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.scala.stream.utils.StreamITCase;
-import org.apache.flink.types.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.api.java.stream.utils.StreamTestData;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SqlITCase extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void testSelect() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
-		Table in = tableEnv.fromDataStream(ds, "a,b,c");
-		tableEnv.registerTable("MyTable", in);
-
-		String sqlQuery = "SELECT * FROM MyTable";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList<>();
-		expected.add("1,1,Hi");
-		expected.add("2,2,Hello");
-		expected.add("3,2,Hello world");
-
-		StreamITCase.compareWithList(expected);
-	}
-
-	@Test
-	public void testFilter() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
-		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
-
-		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList<>();
-		expected.add("1,1,1");
-		expected.add("2,2,2");
-		expected.add("2,3,1");
-		expected.add("3,4,2");
-
-		StreamITCase.compareWithList(expected);
-	}
-
-	@Test
-	public void testUnion() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
-		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
-		tableEnv.registerTable("T1", t1);
-
-		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
-		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
-
-		String sqlQuery = "SELECT * FROM T1 " +
-							"UNION ALL " +
-							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList<>();
-		expected.add("1,1,Hi");
-		expected.add("2,2,Hello");
-		expected.add("3,2,Hello world");
-		expected.add("1,1,Hallo");
-		expected.add("2,2,Hallo Welt");
-		expected.add("2,3,Hallo Welt wie");
-
-		StreamITCase.compareWithList(expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
deleted file mode 100644
index 82ebf95..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.stream.utils;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class StreamTestData {
-
-	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		return env.fromCollection(data);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
new file mode 100644
index 0000000..114579f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExplainTest extends MultipleProgramsTestBase {
+
+	public ExplainTest() {
+		super(TestExecutionMode.CLUSTER);
+	}
+
+	private static String testFilePath = ExplainTest.class.getResource("/").getFile();
+
+	@Test
+	public void testFilterWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
+		Table table = tableEnv
+			.fromDataSet(input, "a, b")
+			.filter("a % 2 = 0");
+
+		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testFilter0.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+
+	@Test
+	public void testFilterWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
+		Table table = tableEnv
+			.fromDataSet(input, "a, b")
+			.filter("a % 2 = 0");
+
+		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testFilter1.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+
+	@Test
+	public void testJoinWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "a, b");
+		Table table2 = tableEnv.fromDataSet(input2, "c, d");
+		Table table = table1
+			.join(table2)
+			.where("b = d")
+			.select("a, c");
+
+		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testJoin0.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+
+	@Test
+	public void testJoinWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "a, b");
+		Table table2 = tableEnv.fromDataSet(input2, "c, d");
+		Table table = table1
+			.join(table2)
+			.where("b = d")
+			.select("a, c");
+
+		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testJoin1.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+
+	@Test
+	public void testUnionWithoutExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "count, word");
+		Table table2 = tableEnv.fromDataSet(input2, "count, word");
+		Table table = table1.unionAll(table2);
+
+		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testUnion0.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+
+	@Test
+	public void testUnionWithExtended() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
+		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
+		Table table1 = tableEnv.fromDataSet(input1, "count, word");
+		Table table2 = tableEnv.fromDataSet(input2, "count, word");
+		Table table = table1.unionAll(table2);
+
+		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
+		try (Scanner scanner = new Scanner(new File(testFilePath +
+			"../../src/test/scala/resources/testUnion1.out"))){
+			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
+			assertEquals(source, result);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
new file mode 100644
index 0000000..67eb2d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.calcite.tools.RuleSets;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfigBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TableEnvironmentITCase extends TableProgramsTestBase {
+
+	public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+	public static Collection<Object[]> parameters() {
+		return Arrays.asList(new Object[][] {
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT() }
+		});
+	}
+
+	@Test
+	public void testSimpleRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds);
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("f0, f1");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testRegisterWithFields() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds, "a, b, c");
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+				"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterExistingDatasetTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("MyTable", ds);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+				CollectionDataSets.getSmall5TupleDataSet(env);
+		// Must fail. Name is already used for different table.
+		tableEnv.registerDataSet("MyTable", ds2);
+	}
+
+	@Test(expected = TableException.class)
+	public void testScanUnregisteredTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. No table registered under that name.
+		tableEnv.scan("nonRegisteredTable");
+	}
+
+	@Test
+	public void testTableRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.registerTable(tableName, t);
+		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+				"13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testIllegalName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		// Must fail. Table name matches internal name pattern.
+		tableEnv.registerTable("_DataSetTable_42", t);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterTableFromOtherEnv() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+		// Must fail. Table is bound to different TableEnvironment.
+		tableEnv2.registerTable("MyTable", t);
+	}
+
+	@Test
+	public void testAsFromTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		DataSet<?> ds = tableEnv.toDataSet(table, ti);
+		List<?> results = ds.collect();
+		String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
+			"(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
+			"(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
+			"(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
+			"(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
+			"(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
+			"(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromTupleToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
+		data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
+		data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
+		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data), "a, b, c, d")
+			.select("a, b, c, d");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPrivateFieldsPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPrivateFieldPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
+		List<PrivateSmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsWithPojoAndGenericTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PojoWithGeneric> data = new ArrayList<>();
+		data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
+		HashMap<String, String> hm1 = new HashMap<>();
+		hm1.put("test1", "test1");
+		data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
+		HashMap<String, String> hm2 = new HashMap<>();
+		hm2.put("abc", "cde");
+		data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"name AS a, " +
+				"age AS b, " +
+				"generic AS c, " +
+				"generic2 AS d")
+			.select("a, b, c, c as c2, d")
+			.select("a, b, c, c === c2, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Peter,28,{},true,[]\n" +
+			"Anna,56,{test1=test1},true,[]\n" +
+			"Lucy,42,{abc=cde},true,[]\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithToFewFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Not enough field names specified.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithToManyFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Too many field names specified.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Specified field names are not unique.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassInput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassOutput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+		tableEnv.toDataSet(t, MyNonStatic.class);
+	}
+
+	@Test(expected = TableException.class)
+	public void testCustomCalciteConfig() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		CalciteConfig cc = new CalciteConfigBuilder().replaceRuleSet(RuleSets.ofList()).build();
+		tableEnv.getConfig().setCalciteConfig(cc);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.toDataSet(t, Row.class);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public class MyNonStatic {
+		public int number;
+	}
+
+	@SuppressWarnings("unused")
+	public static class SmallPojo {
+
+		public SmallPojo() { }
+
+		public SmallPojo(String name, int age, double salary, String department) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+		}
+
+		public String name;
+		public int age;
+		public double salary;
+		public String department;
+	}
+
+	@SuppressWarnings("unused")
+	public static class PojoWithGeneric {
+		public String name;
+		public int age;
+		public HashMap<String, String> generic;
+		public ArrayList<String> generic2;
+
+		public PojoWithGeneric() {
+			// default constructor
+		}
+
+		public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
+				ArrayList<String> generic2) {
+			this.name = name;
+			this.age = age;
+			this.generic = generic;
+			this.generic2 = generic2;
+		}
+
+		@Override
+		public String toString() {
+			return name + "," + age + "," + generic + "," + generic2;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo {
+
+		public PrivateSmallPojo() { }
+
+		public PrivateSmallPojo(String name, int age, double salary, String department) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+		}
+
+		private String name;
+		private int age;
+		private double salary;
+		private String department;
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		public int getAge() {
+			return age;
+		}
+
+		public void setAge(int age) {
+			this.age = age;
+		}
+
+		public double getSalary() {
+			return salary;
+		}
+
+		public void setSalary(double salary) {
+			this.salary = salary;
+		}
+
+		public String getDepartment() {
+			return department;
+		}
+
+		public void setDepartment(String department) {
+			this.department = department;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class SmallPojo2 {
+
+		public SmallPojo2() { }
+
+		public SmallPojo2(String a, int b, double c, String d) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+		}
+
+		public String a;
+		public int b;
+		public double c;
+		public String d;
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo2 {
+
+		public PrivateSmallPojo2() { }
+
+		public PrivateSmallPojo2(String a, int b, double c, String d) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+		}
+
+		private String a;
+		private int b;
+		private double c;
+		private String d;
+
+		public String getA() {
+			return a;
+		}
+
+		public void setA(String a) {
+			this.a = a;
+		}
+
+		public int getB() {
+			return b;
+		}
+
+		public void setB(int b) {
+			this.b = b;
+		}
+
+		public double getC() {
+			return c;
+		}
+
+		public void setC(double c) {
+			this.c = c;
+		}
+
+		public String getD() {
+			return d;
+		}
+
+		public void setD(String d) {
+			this.d = d;
+		}
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
new file mode 100644
index 0000000..c822efb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.scala.batch.GeneratingInputFormat;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TableSourceITCase extends TableProgramsTestBase {
+
+	public TableSourceITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testBatchTableSourceTableAPI() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+
+		Table result = tableEnv.scan("MyTable")
+			.where("amount < 4")
+			.select("amount * id, name");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
+			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testBatchTableSourceSQL() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+
+		Table result = tableEnv
+			.sql("SELECT amount * id, name FROM MyTable WHERE amount < 4");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
+			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+
+		compareResultAsText(results, expected);
+	}
+
+	public static class TestBatchTableSource implements BatchTableSource<Row> {
+
+		private TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO
+		};
+
+		@Override
+		public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+			return execEnv.createInput(new GeneratingInputFormat(33), getReturnType()).setParallelism(1);
+		}
+
+		@Override
+		public int getNumberOfFields() {
+			return 3;
+		}
+
+		@Override
+		public String[] getFieldsNames() {
+			return new String[]{"name", "id", "amount"};
+		}
+
+		@Override
+		public TypeInformation<?>[] getFieldTypes() {
+			return fieldTypes;
+		}
+
+		@Override
+		public TypeInformation<Row> getReturnType() {
+			return new RowTypeInfo(fieldTypes);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
new file mode 100644
index 0000000..433410c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch.sql;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class SqlITCase extends TableProgramsTestBase {
+
+	public SqlITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
+			"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
+			"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+
+		List<Row> results = resultSet.collect();
+		String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
+			"2,Hello,true,1944-02-24,12.6666666650000000\n" +
+			// Calcite converts to decimals and strings with equal length
+			"1,Test ,true,1944-02-24,12.4444444444444445\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSelectFromTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+		tableEnv.registerTable("T", in);
+
+		String sqlQuery = "SELECT a, c FROM T";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+			"4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
+			"7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
+			"11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
+			"14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
+			"17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
+			"20,Comment#14\n" + "21,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterFromDataSet() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "2\n" + "3\n" + "4";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		tableEnv.registerDataSet("t1", ds1, "a, b, c");
+		tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+
+		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
new file mode 100644
index 0000000..d37ebb5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java.batch.table;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.examples.java.WordCountTable.WC;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AggregationsITCase extends TableProgramsTestBase {
+
+	public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testAggregationTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testAggregationOnNonExistingField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table =
+				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+		Table result =
+				table.select("foo.avg");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testWorkingAggregationDataTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
+				env.fromElements(
+						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
+						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
+
+		Table table = tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,1.5,1.5,2";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregationWithArithmetic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple2<Float, String>> input =
+				env.fromElements(
+						new Tuple2<>(1f, "Hello"),
+						new Tuple2<>(2f, "Ciao"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				table.select("(f0 + 2).avg + 2, f1.count + 5");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "5.5,7";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregationWithTwoCount() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple2<Float, String>> input =
+			env.fromElements(
+				new Tuple2<>(1f, "Hello"),
+				new Tuple2<>(2f, "Ciao"));
+
+		Table table =
+			tableEnv.fromDataSet(input);
+
+		Table result =
+			table.select("f0.count, f1.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testNonWorkingDataTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				// Must fail. Cannot compute SUM aggregate on String field.
+				table.select("f1.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testNoNestedAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result =
+				// Must fail. Aggregation on aggregation not allowed.
+				table.select("f0.sum.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testGroupingOnNonExistentField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv
+			.fromDataSet(input, "a, b, c")
+			// must fail. Field foo is not in input
+			.groupBy("foo")
+			.select("a.avg");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testGroupingInvalidSelection() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv
+			.fromDataSet(input, "a, b, c")
+			.groupBy("a, b")
+			// must fail. Field c is not a grouping key or aggregation
+			.select("c");
+	}
+
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("b, a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupingKeyForwardIfNotUsed() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupNoAggregation() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+		List<Row> results = ds.collect();
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testPojoAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+		DataSet<WC> input = env.fromElements(
+				new WC("Hello", 1),
+				new WC("Ciao", 1),
+				new WC("Hello", 1),
+				new WC("Hola", 1),
+				new WC("Hola", 1));
+
+		Table table = tableEnv.fromDataSet(input);
+
+		Table filtered = table
+				.groupBy("word")
+				.select("word.count as frequency, word")
+				.filter("frequency = 2");
+
+		List<String> result = tableEnv.toDataSet(filtered, WC.class)
+				.map(new MapFunction<WC, String>() {
+					public String map(WC value) throws Exception {
+						return value.word;
+					}
+				}).collect();
+		String expected = "Hello\n" + "Hola";
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testPojoGrouping() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+			new Tuple3<>("A", 23.0, "Z"),
+			new Tuple3<>("A", 24.0, "Y"),
+			new Tuple3<>("B", 1.0, "Z"));
+
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(data, "groupMe, value, name")
+			.select("groupMe, value, name")
+			.where("groupMe != 'B'");
+
+		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+			.sortGroup("value", Order.DESCENDING)
+			.first(1);
+
+		List<MyPojo> resultList = result.collect();
+		compareResultAsText(resultList, "A,24.0,Y");
+	}
+
+	@Test
+	public void testDistinct() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table distinct = table.select("b").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testDistinctAfterAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+		Table distinct = table.groupBy("a, e").select("e").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n";
+		compareResultAsText(results, expected);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static class MyPojo implements Serializable {
+		private static final long serialVersionUID = 8741918940120107213L;
+
+		public String groupMe;
+		public double value;
+		public String name;
+
+		public MyPojo() {
+			// for serialization
+		}
+
+		public MyPojo(String groupMe, double value, String name) {
+			this.groupMe = groupMe;
+			this.value = value;
+			this.name = name;
+		}
+
+		@Override
+		public String toString() {
+			return groupMe + "," + value + "," + name;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
new file mode 100644
index 0000000..b1ef563
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch.table;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CalcITCase extends TableProgramsTestBase {
+
+	public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
+	}
+
+	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+	public static Collection<Object[]> parameters() {
+		return Arrays.asList(new Object[][] {
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() }
+		});
+	}
+
+	@Test
+	public void testSimpleSelectAllWithAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+				.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectWithNaming() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+				.select("f0 as a, f1 as b")
+				.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectRenameAll() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+			.select("f0 as a, f1 as b, f2 as c")
+			.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testSelectInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1, foo + 2");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testSelectAmbiguousFieldNames() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1 as foo, b + 2 as foo");
+	}
+
+	@Test
+	public void testSelectStar() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+			.select("*");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAllRejectingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("false");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAllPassingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("true");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterOnIntegerTupleField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter(" a % 2 = 0 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNotEquals() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("!( a % 2 <> 0 ) ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testDisjunctivePreds() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.filter("a < 2 || a > 20");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testIntegerBiggerThan128() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.filter("a = 300 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "300,1,Hello\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testFilterInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		table
+			// Must fail. Field foo does not exist.
+			.filter("foo = 17");
+	}
+
+	public static class OldHashCode extends ScalarFunction {
+		public int eval(String s) {
+			return -1;
+		}
+	}
+
+	public static class HashCode extends ScalarFunction {
+		public int eval(String s) {
+			return s.hashCode();
+		}
+	}
+
+	@Test
+	public void testUserDefinedScalarFunction() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		tableEnv.registerFunction("hashCode", new OldHashCode());
+		tableEnv.registerFunction("hashCode", new HashCode());
+
+		DataSource<String> input = env.fromElements("a", "b", "c");
+
+		Table table = tableEnv.fromDataSet(input, "text");
+
+		Table result = table.select("text.hashCode()");
+
+		DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
+		List<Integer> results = ds.collect();
+		String expected = "97\n98\n99";
+		compareResultAsText(results, expected);
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
new file mode 100644
index 0000000..b1bb6e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch.table;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class CastingITCase extends TableProgramsTestBase {
+
+	public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testNumericAutocastInArithmetic() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input =
+				env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select("f0 + 1, f1 +" +
+				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNumericAutocastInComparison() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input =
+				env.fromElements(
+						new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d),
+						new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d));
+
+		Table table =
+				tableEnv.fromDataSet(input, "a,b,c,d,e,f");
+
+		Table result = table
+				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,2,2,2.0,2.0";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testCasting() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
+				env.fromElements(new Tuple4<>(1, 0.0, 1L, true));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select(
+				// * -> String
+				"f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," +
+				// NUMERIC TYPE -> Boolean
+				"f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
+				// NUMERIC TYPE -> NUMERIC TYPE
+				"f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," +
+				// Boolean -> NUMERIC TYPE
+				"f3.cast(DOUBLE)," +
+				// identity casting
+				"f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,0.0,1,true," +
+			"true,false,true," +
+			"1.0,0,1," +
+			"1.0," +
+			"1,0.0,1,true\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testCastFromString() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSource<Tuple3<String, String, String>> input =
+				env.fromElements(new Tuple3<>("1", "true", "2.0"));
+
+		Table table =
+				tableEnv.fromDataSet(input);
+
+		Table result = table.select(
+				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,1,1,2.0,2.0,true\n";
+		compareResultAsText(results, expected);
+	}
+}
+


Mime
View raw message