flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [8/9] flink git commit: [FLINK-3847] Restructure flink-table test packages.
Date Sun, 01 May 2016 12:47:22 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
new file mode 100644
index 0000000..4c40596
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class JoinITCase extends MultipleProgramsTestBase {
+
+	public JoinITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithJoinFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hello world, how are you?,Hallo Welt wie\n" +
+				"I am fine.,Hallo Welt wie\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithMultipleKeys() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testJoinNonExistingKey() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		// Must fail. Field foo does not exist.
+		in1.join(in2).where("foo === e").select("c, g");
+	}
+
+	@Test(expected = TableException.class)
+	public void testJoinWithNonMatchingKeyTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2)
+			// Must fail. Types of join fields are not compatible (Integer and String)
+			.where("a === g").select("c, g");
+
+		tableEnv.toDataSet(result, Row.class).collect();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testJoinWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
+
+		// Must fail. Join input have overlapping field names.
+		in1.join(in2).where("a === d").select("c, g");
+	}
+
+	@Test
+	public void testJoinWithAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1
+				.join(in2).where("a === d").select("g.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "6";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testJoinTablesFromDifferentEnvs() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+		Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
+
+		// Must fail. Tables are bound to different TableEnvironments.
+		in1.join(in2).where("a === d").select("g.count");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
new file mode 100644
index 0000000..ba564bf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+	public PojoGroupingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testPojoGrouping() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+			new Tuple3<>("A", 23.0, "Z"),
+			new Tuple3<>("A", 24.0, "Y"),
+			new Tuple3<>("B", 1.0, "Z"));
+
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		Table table = tableEnv
+			.fromDataSet(data, "groupMe, value, name")
+			.select("groupMe, value, name")
+			.where("groupMe != 'B'");
+
+		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+			.sortGroup("value", Order.DESCENDING)
+			.first(1);
+
+		List<MyPojo> resultList = result.collect();
+		compareResultAsText(resultList, "A,24.0,Y");
+	}
+
+	public static class MyPojo implements Serializable {
+		private static final long serialVersionUID = 8741918940120107213L;
+
+		public String groupMe;
+		public double value;
+		public String name;
+
+		public MyPojo() {
+			// for serialization
+		}
+
+		public MyPojo(String groupMe, double value, String name) {
+			this.groupMe = groupMe;
+			this.value = value;
+			this.name = name;
+		}
+
+		@Override
+		public String toString() {
+			return groupMe + "," + value + "," + name;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
new file mode 100644
index 0000000..7c9478a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class SelectITCase extends TableProgramsTestBase {
+
+	public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testSimpleSelectAllWithAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+				.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectWithNaming() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+				.select("f0 as a, f1 as b")
+				.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectRenameAll() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+			.select("f0 as a, f1 as b, f2 as c")
+			.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testSelectInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1, foo + 2");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testSelectAmbiguousFieldNames() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1 as foo, b + 2 as foo");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
new file mode 100644
index 0000000..e55bd22
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.codegen.CodeGenException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class StringExpressionsITCase extends MultipleProgramsTestBase {
+
+	public StringExpressionsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testSubstring() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+				new Tuple2<>("AAAA", 2),
+				new Tuple2<>("BBBB", 1));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+				.select("a.substring(1, b)");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "AA\nB";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSubstringWithMaxEnd() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+				new Tuple2<>("ABCD", 3),
+				new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+				.select("a.substring(b)");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "CD\nBCD";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = CodeGenException.class)
+	public void testNonWorkingSubstring1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, Float>> ds = env.fromElements(
+				new Tuple2<>("ABCD", 2.0f),
+				new Tuple2<>("ABCD", 1.0f));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			// Must fail. Second parameter of substring must be an Integer not a Double.
+			.select("a.substring(0, b)");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		resultSet.collect();
+	}
+
+	@Test(expected = CodeGenException.class)
+	public void testNonWorkingSubstring2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, String>> ds = env.fromElements(
+				new Tuple2<>("ABCD", "a"),
+				new Tuple2<>("ABCD", "b"));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			// Must fail. First parameter of substring must be an Integer not a String.
+			.select("a.substring(b, 15)");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		resultSet.collect();
+	}
+
+	@Test(expected = CodeGenException.class)
+	public void testGeneratedCodeForStringComparison() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+		// Must fail because the comparison here is between Integer(column 'a') and (String 'Fred')
+		Table res = in.filter("a = 'Fred'" );
+		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+	}
+
+	@Test(expected = CodeGenException.class)
+	public void testGeneratedCodeForIntegerEqualsComparison() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
+		Table res = in.filter("c = 10" );
+		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+	}
+
+	@Test(expected = CodeGenException.class)
+	public void testGeneratedCodeForIntegerGreaterComparison() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
+		Table res = in.filter("c > 10");
+		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+	}
+
+	@Test
+	public void testStringConcat() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("a + b + 42");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "ABCD342\nABCD242";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testStringConcat1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("42 + b + a");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "44ABCD\n45ABCD";
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
new file mode 100644
index 0000000..a7805f8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnionITCase extends MultipleProgramsTestBase {
+
+	public UnionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
+
+		Table selected = in1.unionAll(in2).select("c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testUnionWithFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+
+		Table selected = in1.unionAll(in2).where("b < 2").select("c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi\n" + "Hallo\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnionIncompatibleNumberOfFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		// Must fail. Number of fields of union inputs do not match
+		in1.unionAll(in2);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnionIncompatibleFieldsName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d");
+
+		// Must fail. Field names of union inputs do not match
+		in1.unionAll(in2);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnionIncompatibleFieldTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c");
+
+		// Must fail. Field types of union inputs do not match
+		in1.unionAll(in2);
+	}
+
+	@Test
+	public void testUnionWithAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+
+		Table selected = in1.unionAll(in2).select("c.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "18";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testUnionWithJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds3 = CollectionDataSets.getSmall5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+		Table in3 = tableEnv.fromDataSet(ds3, "a2, b2, d2, c2, e2").select("a2, b2, c2");
+
+	    Table joinDs = in1.unionAll(in2).join(in3).where("a === a2").select("c, c2");
+	    DataSet<Row> ds = tableEnv.toDataSet(joinDs, Row.class);
+	    List<Row> results = ds.collect();
+
+	    String expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
+	      "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
+	      "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
+	      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
+	    compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testUnionTablesFromDifferentEnvs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+		Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
+
+		// Must fail. Tables are bound to different TableEnvironments.
+		in1.unionAll(in2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
deleted file mode 100644
index 31a5d53..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.sql.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class BatchSQLITCase extends TableProgramsTestBase {
-
-	public BatchSQLITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testSelectFromTable() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-		tableEnv.registerTable("T", in);
-
-		String sqlQuery = "SELECT a, c FROM T";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
-			"4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
-			"7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
-			"11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
-			"14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
-			"17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
-			"20,Comment#14\n" + "21,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testFilterFromDataSet() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
-
-		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "2\n" + "3\n" + "4";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
-
-		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "231,1,21,21,11";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoin() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		tableEnv.registerDataSet("t1", ds1, "a, b, c");
-		tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
-
-		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
deleted file mode 100644
index 8e420f2..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.sql.test;
-
-import org.apache.flink.api.java.table.StreamTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.api.java.table.test.utils.StreamTestData;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StreamingSQLITCase extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void testSelect() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
-		Table in = tableEnv.fromDataStream(ds, "a,b,c");
-		tableEnv.registerTable("MyTable", in);
-
-		String sqlQuery = "SELECT STREAM * FROM MyTable";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList();
-		expected.add("1,1,Hi");
-		expected.add("2,2,Hello");
-		expected.add("3,2,Hello world");
-
-		StreamITCase.compareWithList(expected);
-	}
-
-	@Test
-	public void testFilter() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
-		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
-
-		String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList();
-		expected.add("1,1,1");
-		expected.add("2,2,2");
-		expected.add("2,3,1");
-		expected.add("3,4,2");
-
-		StreamITCase.compareWithList(expected);
-	}
-
-	@Test
-	public void testUnion() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
-		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
-		tableEnv.registerTable("T1", t1);
-
-		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
-		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
-
-		String sqlQuery = "SELECT STREAM * FROM T1 " +
-							"UNION ALL " +
-							"(SELECT STREAM a, b, c FROM T2 WHERE a	< 3)";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		List<String> expected = new ArrayList();
-		expected.add("1,1,Hi");
-		expected.add("2,2,Hello");
-		expected.add("3,2,Hello world");
-		expected.add("1,1,Hallo");
-		expected.add("2,2,Hallo Welt");
-		expected.add("2,3,Hallo Welt wie");
-
-		StreamITCase.compareWithList(expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
new file mode 100644
index 0000000..4161b1e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.stream.sql;
+
+import org.apache.flink.api.java.table.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.api.java.stream.utils.StreamTestData;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testSelect() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTable", in);
+
+		String sqlQuery = "SELECT STREAM * FROM MyTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testFilter() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+		String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,1");
+		expected.add("2,2,2");
+		expected.add("2,3,1");
+		expected.add("3,4,2");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
+		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+		tableEnv.registerTable("T1", t1);
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+		String sqlQuery = "SELECT STREAM * FROM T1 " +
+							"UNION ALL " +
+							"(SELECT STREAM a, b, c FROM T2 WHERE a	< 3)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+		expected.add("1,1,Hallo");
+		expected.add("2,2,Hallo Welt");
+		expected.add("2,3,Hallo Welt wie");
+
+		StreamITCase.compareWithList(expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
new file mode 100644
index 0000000..82ebf95
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.stream.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StreamTestData {
+
+	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
deleted file mode 100644
index 35edb09..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.ExpressionParserException;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.plan.PlanGenException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.JavaTableExample.WC;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
-
-	public AggregationsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testAggregationTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "231,1,21,21,11";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAggregationOnNonExistingField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		Table table =
-				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result =
-				table.select("foo.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testWorkingAggregationDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
-				env.fromElements(
-						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
-						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,1.5,1.5,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSource<Tuple2<Float, String>> input =
-				env.fromElements(
-						new Tuple2<>(1f, "Hello"),
-						new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("(f0 + 2).avg + 2, f1.count + 5");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "5.5,7";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithTwoCount() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSource<Tuple2<Float, String>> input =
-			env.fromElements(
-				new Tuple2<>(1f, "Hello"),
-				new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-			tableEnv.fromDataSet(input);
-
-		Table result =
-			table.select("f0.count, f1.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = PlanGenException.class)
-	public void testNonWorkingDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Cannot compute SUM aggregate on String field.
-				table.select("f1.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testNoNestedAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Aggregation on aggregation not allowed.
-				table.select("f0.sum.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testPojoAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<WC> input = env.fromElements(
-				new WC("Hello", 1),
-				new WC("Ciao", 1),
-				new WC("Hello", 1),
-				new WC("Hola", 1),
-				new WC("Hola", 1));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table filtered = table
-				.groupBy("word")
-				.select("word.count as count, word")
-				.filter("count = 2");
-
-		List<String> result = tableEnv.toDataSet(filtered, WC.class)
-				.map(new MapFunction<WC, String>() {
-					public String map(WC value) throws Exception {
-						return value.word;
-					}
-				}).collect();
-		String expected = "Hello\n" + "Hola";
-		compareResultAsText(result, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
deleted file mode 100644
index 2a17c12..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends TableProgramsTestBase {
-
-	public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testNumericAutocastInArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input =
-				env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select("f0 + 1, f1 +" +
-				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNumericAutocastInComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input =
-				env.fromElements(
-						new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d),
-						new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a,b,c,d,e,f");
-
-		Table result = table
-				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2,2.0,2.0";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCasting() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
-				env.fromElements(new Tuple4<>(1, 0.0, 1L, true));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				// * -> String
-				"f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," +
-				// NUMERIC TYPE -> Boolean
-				"f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
-				// NUMERIC TYPE -> NUMERIC TYPE
-				"f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," +
-				// Boolean -> NUMERIC TYPE
-				"f3.cast(DOUBLE)," +
-				// identity casting
-				"f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,0.0,1,true," +
-			"true,false,true," +
-			"1.0,0,1," +
-			"1.0," +
-			"1,0.0,1,true\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCastFromString() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<String, String, String>> input =
-				env.fromElements(new Tuple3<>("1", "true", "2.0"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,2.0,2.0,true\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Ignore // Date type not supported yet
-	@Test
-	public void testCastDateFromString() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple4<String, String, String, String>> input =
-				env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table
-				.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
-				.select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
-				"1970-01-17 17:47:53.775\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Ignore // Date type not supported yet
-	@Test
-	public void testCastDateToStringAndLong() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<String, String>> input =
-			env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000"));
-
-		Table table =
-			tableEnv.fromDataSet(input);
-
-		Table result = table
-			.select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
-			.select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
-		compareResultAsText(results, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
deleted file mode 100644
index aca0c30..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DistinctITCase extends MultipleProgramsTestBase {
-
-	public DistinctITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testDistinct() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table distinct = table.select("b").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDistinctAfterAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
-		Table distinct = table.groupBy("a, e").select("e").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
deleted file mode 100644
index 4d6adfa..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import static org.junit.Assert.fail;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class ExpressionsITCase extends TableProgramsTestBase {
-
-	public ExpressionsITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Integer>> input =
-				env.fromElements(new Tuple2<>(5, 10));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"a - 5, a + 5, a / 2, a * 2, a % 2, -a");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "0,10,2,10,1,-5";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testLogic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"b && true, b && false, b || false, !b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,true,false";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testComparisons() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<Integer, Integer, Integer>> input =
-				env.fromElements(new Tuple3<>(5, 5, 4));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table.select(
-				"a > c, a >= b, a < c, a.isNull, a.isNotNull");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,true,false,false,true";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNullLiteral() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Integer>> input =
-				env.fromElements(new Tuple2<>(1, 0));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select("a, b, Null(INT), Null(STRING) === ''");
-
-		try {
-			DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-			if (!config().getNullCheck()) {
-				fail("Exception expected if null check is disabled.");
-			}
-			List<Row> results = ds.collect();
-			String expected = "1,0,null,null";
-			compareResultAsText(results, expected);
-		}
-		catch (CodeGenException e) {
-			if (config().getNullCheck()) {
-				throw e;
-			}
-		}
-	}
-
-	@Test
-	public void testEval() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"(b && true).eval('true', 'false')," +
-					"false.eval('true', 'false')," +
-					"true.eval(true.eval(true.eval(10, 4), 4), 4)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,10";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testEvalInvalidTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select("(b && true).eval(5, 'false')");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,3,10";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testComplexExpression() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<Integer, Integer, Integer>> input =
-				env.fromElements(new Tuple3<>(5, 5, 4));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table.select(
-				"a.isNull().isNull," +
-					"a.abs() + a.abs().abs().abs().abs()," +
-					"a.cast(STRING) + a.cast(STRING)," +
-					"CAST(ISNULL(b), INT)," +
-					"ISNULL(CAST(b, INT).abs()) === false," +
-					"((((true) === true) || false).cast(STRING) + 'X ').trim");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "false,10,55,0,true,trueX";
-		compareResultAsText(results, expected);
-	}
-
-}
-


Mime
View raw message