flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [7/9] flink git commit: [FLINK-3847] Restructure flink-table test packages.
Date Sun, 01 May 2016 12:47:21 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
deleted file mode 100644
index 30b5aab..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends TableProgramsTestBase {
-
-	public FilterITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAllRejectingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("false");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllPassingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("true");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testFilterOnIntegerTupleField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter(" a % 2 = 0 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNotEquals() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("!( a % 2 <> 0 ) ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDisjunctivePreds() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a < 2 || a > 20");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testIntegerBiggerThan128() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a = 300 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "300,1,Hello\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testFilterInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		table
-			// Must fail. Field foo does not exist.
-			.filter("foo = 17");
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FromDataSetITCase.java
deleted file mode 100644
index 64a4092..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FromDataSetITCase.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FromDataSetITCase extends TableProgramsTestBase {
-
-	public FromDataSetITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAsFromTuple() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
-			.select("a, b, c");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToTuple() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
-			.select("a, b, c");
-
-		TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
-			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
-
-		DataSet<?> ds = tableEnv.toDataSet(table, ti);
-		List<?> results = ds.collect();
-		String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
-			"(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
-			"(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
-			"(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
-			"(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
-			"(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
-			"(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromTupleToPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
-		data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
-		data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
-		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data), "a, b, c, d")
-			.select("a, b, c, d");
-
-		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
-		List<SmallPojo2> results = ds.collect();
-		String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<SmallPojo> data = new ArrayList<>();
-		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromPrivateFieldsPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<PrivateSmallPojo> data = new ArrayList<>();
-		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<SmallPojo> data = new ArrayList<>();
-		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
-		List<SmallPojo2> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToPrivateFieldPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<PrivateSmallPojo> data = new ArrayList<>();
-		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
-		List<PrivateSmallPojo2> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithToFewFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Not enough field names specified.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithToManyFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Too many field names specified.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithAmbiguousFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Specified field names are not unique.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithNonFieldReference1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. as() does only allow field name expressions
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testAsWithNonFieldReference2() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. as() does only allow field name expressions
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unused")
-	public static class SmallPojo {
-
-		public SmallPojo() { }
-
-		public SmallPojo(String name, int age, double salary, String department) {
-			this.name = name;
-			this.age = age;
-			this.salary = salary;
-			this.department = department;
-		}
-
-		public String name;
-		public int age;
-		public double salary;
-		public String department;
-	}
-
-	@SuppressWarnings("unused")
-	public static class PrivateSmallPojo {
-
-		public PrivateSmallPojo() { }
-
-		public PrivateSmallPojo(String name, int age, double salary, String department) {
-			this.name = name;
-			this.age = age;
-			this.salary = salary;
-			this.department = department;
-		}
-
-		private String name;
-		private int age;
-		private double salary;
-		private String department;
-
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-
-		public int getAge() {
-			return age;
-		}
-
-		public void setAge(int age) {
-			this.age = age;
-		}
-
-		public double getSalary() {
-			return salary;
-		}
-
-		public void setSalary(double salary) {
-			this.salary = salary;
-		}
-
-		public String getDepartment() {
-			return department;
-		}
-
-		public void setDepartment(String department) {
-			this.department = department;
-		}
-	}
-
-	@SuppressWarnings("unused")
-	public static class SmallPojo2 {
-
-		public SmallPojo2() { }
-
-		public SmallPojo2(String a, int b, double c, String d) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-			this.d = d;
-		}
-
-		public String a;
-		public int b;
-		public double c;
-		public String d;
-
-		@Override
-		public String toString() {
-			return a + "," + b + "," + c + "," + d;
-		}
-	}
-
-	@SuppressWarnings("unused")
-	public static class PrivateSmallPojo2 {
-
-		public PrivateSmallPojo2() { }
-
-		public PrivateSmallPojo2(String a, int b, double c, String d) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-			this.d = d;
-		}
-
-		private String a;
-		private int b;
-		private double c;
-		private String d;
-
-		public String getA() {
-			return a;
-		}
-
-		public void setA(String a) {
-			this.a = a;
-		}
-
-		public int getB() {
-			return b;
-		}
-
-		public void setB(int b) {
-			this.b = b;
-		}
-
-		public double getC() {
-			return c;
-		}
-
-		public void setC(double c) {
-			this.c = c;
-		}
-
-		public String getD() {
-			return d;
-		}
-
-		public void setD(String d) {
-			this.d = d;
-		}
-
-		@Override
-		public String toString() {
-			return a + "," + b + "," + c + "," + d;
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
deleted file mode 100644
index c57c514..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
-
-	public GroupedAggregationsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testGroupingOnNonExistentField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			// must fail. Field foo is not in input
-			.groupBy("foo")
-			.select("a.avg");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testGroupingInvalidSelection() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			.groupBy("a, b")
-			// must fail. Field c is not a grouping key or aggregation
-			.select("c");
-	}
-
-	@Test
-	public void testGroupedAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("b, a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupingKeyForwardIfNotUsed() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupNoAggregation() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-		List<Row> results = ds.collect();
-		compareResultAsText(results, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
deleted file mode 100644
index 8348a4a..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends MultipleProgramsTestBase {
-
-	public JoinITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testJoin() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithJoinFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hello world, how are you?,Hallo Welt wie\n" +
-				"I am fine.,Hallo Welt wie\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithMultipleKeys() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testJoinNonExistingKey() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Field foo does not exist.
-		in1.join(in2).where("foo === e").select("c, g");
-	}
-
-	@Test(expected = TableException.class)
-	public void testJoinWithNonMatchingKeyTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2)
-			// Must fail. Types of join fields are not compatible (Integer and String)
-			.where("a === g").select("c, g");
-
-		tableEnv.toDataSet(result, Row.class).collect();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testJoinWithAmbiguousFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
-
-		// Must fail. Join input have overlapping field names.
-		in1.join(in2).where("a === d").select("c, g");
-	}
-
-	@Test
-	public void testJoinWithAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1
-				.join(in2).where("a === d").select("g.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "6";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = TableException.class)
-	public void testJoinTablesFromDifferentEnvs() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
-		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-		Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Tables are bound to different TableEnvironments.
-		in1.join(in2).where("a === d").select("g.count");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
deleted file mode 100644
index 1109e78..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PojoGroupingITCase extends MultipleProgramsTestBase {
-
-	public PojoGroupingITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testPojoGrouping() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-			new Tuple3<>("A", 23.0, "Z"),
-			new Tuple3<>("A", 24.0, "Y"),
-			new Tuple3<>("B", 1.0, "Z"));
-
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		Table table = tableEnv
-			.fromDataSet(data, "groupMe, value, name")
-			.select("groupMe, value, name")
-			.where("groupMe != 'B'");
-
-		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
-
-		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-			.sortGroup("value", Order.DESCENDING)
-			.first(1);
-
-		List<MyPojo> resultList = result.collect();
-		compareResultAsText(resultList, "A,24.0,Y");
-	}
-
-	public static class MyPojo implements Serializable {
-		private static final long serialVersionUID = 8741918940120107213L;
-
-		public String groupMe;
-		public double value;
-		public String name;
-
-		public MyPojo() {
-			// for serialization
-		}
-
-		public MyPojo(String groupMe, double value, String name) {
-			this.groupMe = groupMe;
-			this.value = value;
-			this.name = name;
-		}
-
-		@Override
-		public String toString() {
-			return groupMe + "," + value + "," + name;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
deleted file mode 100644
index 5029808..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class SelectITCase extends TableProgramsTestBase {
-
-	public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testSimpleSelectAllWithAs() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-				.select("a, b, c");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectWithNaming() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-				.select("f0 as a, f1 as b")
-				.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectRenameAll() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-			.select("f0 as a, f1 as b, f2 as c")
-			.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testSelectInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1, foo + 2");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testSelectAmbiguousFieldNames() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1 as foo, b + 2 as foo");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainTest.java
deleted file mode 100644
index 9389a0c..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SqlExplainTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-
-public class SqlExplainTest extends MultipleProgramsTestBase {
-
-	public SqlExplainTest() {
-		super(TestExecutionMode.CLUSTER);
-	}
-
-	private static String testFilePath = SqlExplainTest.class.getResource("/").getFile();
-
-	@Test
-	public void testFilterWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter0.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testFilterWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table, true);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter1.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin0.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table, true);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin1.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion0.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table, true);
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion1.out"))){
-			String source = scanner.useDelimiter("\\A").next();
-			assertEquals(source, result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
deleted file mode 100644
index 32afc5d..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class StringExpressionsITCase extends MultipleProgramsTestBase {
-
-	public StringExpressionsITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testSubstring() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<>("AAAA", 2),
-				new Tuple2<>("BBBB", 1));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-				.select("a.substring(1, b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "AA\nB";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSubstringWithMaxEnd() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<>("ABCD", 3),
-				new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-				.select("a.substring(b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "CD\nBCD";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = CodeGenException.class)
-	public void testNonWorkingSubstring1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Float>> ds = env.fromElements(
-				new Tuple2<>("ABCD", 2.0f),
-				new Tuple2<>("ABCD", 1.0f));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			// Must fail. Second parameter of substring must be an Integer not a Double.
-			.select("a.substring(0, b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.collect();
-	}
-
-	@Test(expected = CodeGenException.class)
-	public void testNonWorkingSubstring2() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, String>> ds = env.fromElements(
-				new Tuple2<>("ABCD", "a"),
-				new Tuple2<>("ABCD", "b"));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			// Must fail. First parameter of substring must be an Integer not a String.
-			.select("a.substring(b, 15)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.collect();
-	}
-
-	@Test(expected = CodeGenException.class)
-	public void testGeneratedCodeForStringComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between Integer(column 'a') and (String 'Fred')
-		Table res = in.filter("a = 'Fred'" );
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test(expected = CodeGenException.class)
-	public void testGeneratedCodeForIntegerEqualsComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c = 10" );
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test(expected = CodeGenException.class)
-	public void testGeneratedCodeForIntegerGreaterComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c > 10");
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test
-	public void testStringConcat() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-			new Tuple2<>("ABCD", 3),
-			new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			.select("a + b + 42");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "ABCD342\nABCD242";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testStringConcat1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-			new Tuple2<>("ABCD", 3),
-			new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			.select("42 + b + a");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "44ABCD\n45ABCD";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
deleted file mode 100644
index c596014..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableEnvironmentITCase.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TableEnvironmentITCase extends TableProgramsTestBase {
-
-	public TableEnvironmentITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testSimpleRegister() throws Exception {
-		final String tableName = "MyTable";
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		tableEnv.registerDataSet(tableName, ds);
-		Table t = tableEnv.scan(tableName);
-
-		Table result = t.select("f0, f1");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testRegisterWithFields() throws Exception {
-		final String tableName = "MyTable";
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		tableEnv.registerDataSet(tableName, ds, "a, b, c");
-		Table t = tableEnv.scan(tableName);
-
-		Table result = t.select("a, b, c");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-				"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = TableException.class)
-	public void testRegisterExistingDatasetTable() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		tableEnv.registerDataSet("MyTable", ds);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
-				CollectionDataSets.getSmall5TupleDataSet(env);
-		// Must fail. Name is already used for different table.
-		tableEnv.registerDataSet("MyTable", ds2);
-	}
-
-	@Test(expected = TableException.class)
-	public void testScanUnregisteredTable() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. No table registered under that name.
-		tableEnv.scan("nonRegisteredTable");
-	}
-
-	@Test
-	public void testTableRegister() throws Exception {
-		final String tableName = "MyTable";
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table t = tableEnv.fromDataSet(ds);
-		tableEnv.registerTable(tableName, t);
-		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
-				"13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = TableException.class)
-	public void testIllegalName() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table t = tableEnv.fromDataSet(ds);
-		// Must fail. Table name matches internal name pattern.
-		tableEnv.registerTable("_DataSetTable_42", t);
-	}
-
-	@Test(expected = TableException.class)
-	public void testRegisterTableFromOtherEnv() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
-		BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
-
-		Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-		// Must fail. Table is bound to different TableEnvironment.
-		tableEnv2.registerTable("MyTable", t);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableSourceITCase.java
deleted file mode 100644
index 27129a3..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/TableSourceITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.scala.table.test.GeneratingInputFormat;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.sources.BatchTableSource;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-
-@RunWith(Parameterized.class)
-public class TableSourceITCase extends TableProgramsTestBase {
-
-	public TableSourceITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testBatchTableSourceTableAPI() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
-
-		Table result = tableEnv.scan("MyTable")
-			.where("amount < 4")
-			.select("amount * id, name");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
-			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
-
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testBatchTableSourceSQL() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
-
-		Table result = tableEnv
-			.sql("SELECT amount * id, name FROM MyTable WHERE amount < 4");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
-			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
-
-		compareResultAsText(results, expected);
-	}
-
-	public static class TestBatchTableSource implements BatchTableSource<Row> {
-
-		private TypeInformation[] fieldTypes = new TypeInformation<?>[] {
-			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO
-		};
-
-		@Override
-		public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-			return execEnv.createInput(new GeneratingInputFormat(33), getReturnType()).setParallelism(1);
-		}
-
-		@Override
-		public int getNumberOfFields() {
-			return 3;
-		}
-
-		@Override
-		public String[] getFieldsNames() {
-			return new String[]{"name", "id", "amount"};
-		}
-
-		@Override
-		public TypeInformation<?>[] getFieldTypes() {
-			return fieldTypes;
-		}
-
-		@Override
-		public TypeInformation<Row> getReturnType() {
-			return new RowTypeInfo(fieldTypes);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
deleted file mode 100644
index b69fec4..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class UnionITCase extends MultipleProgramsTestBase {
-
-	public UnionITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testUnion() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
-
-		Table selected = in1.unionAll(in2).select("c");
-
-		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testUnionWithFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
-
-		Table selected = in1.unionAll(in2).where("b < 2").select("c");
-
-		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi\n" + "Hallo\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUnionIncompatibleNumberOfFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Number of fields of union inputs do not match
-		in1.unionAll(in2);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUnionIncompatibleFieldsName() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d");
-
-		// Must fail. Field names of union inputs do not match
-		in1.unionAll(in2);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUnionIncompatibleFieldTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c");
-
-		// Must fail. Field types of union inputs do not match
-		in1.unionAll(in2);
-	}
-
-	@Test
-	public void testUnionWithAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
-
-		Table selected = in1.unionAll(in2).select("c.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "18";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testUnionWithJoin() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds3 = CollectionDataSets.getSmall5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
-		Table in3 = tableEnv.fromDataSet(ds3, "a2, b2, d2, c2, e2").select("a2, b2, c2");
-
-	    Table joinDs = in1.unionAll(in2).join(in3).where("a === a2").select("c, c2");
-	    DataSet<Row> ds = tableEnv.toDataSet(joinDs, Row.class);
-	    List<Row> results = ds.collect();
-
-	    String expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
-	      "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
-	      "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
-	      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
-	    compareResultAsText(results, expected);
-	}
-
-	@Test(expected = TableException.class)
-	public void testUnionTablesFromDifferentEnvs() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
-		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-
-		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-		Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
-
-		// Must fail. Tables are bound to different TableEnvironments.
-		in1.unionAll(in2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
deleted file mode 100644
index dc3a8dc..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/utils/StreamTestData.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test.utils;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class StreamTestData {
-
-	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		return env.fromCollection(data);
-	}
-}


Mime
View raw message