http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
new file mode 100644
index 0000000..4c40596
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class JoinITCase extends MultipleProgramsTestBase {
+
+ public JoinITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1.join(in2).where("b === e").select("c, g");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testJoinWithFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testJoinWithJoinFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hello world, how are you?,Hallo Welt wie\n" +
+ "I am fine.,Hallo Welt wie\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testJoinWithMultipleKeys() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1.join(in2).where("a === d && b === h").select("c, g");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testJoinNonExistingKey() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ // Must fail. Field foo does not exist.
+ in1.join(in2).where("foo === e").select("c, g");
+ }
+
+ @Test(expected = TableException.class)
+ public void testJoinWithNonMatchingKeyTypes() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1.join(in2)
+ // Must fail. Types of join fields are not compatible (Integer and String)
+ .where("a === g").select("c, g");
+
+ tableEnv.toDataSet(result, Row.class).collect();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testJoinWithAmbiguousFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
+
+ // Must fail. Join input have overlapping field names.
+ in1.join(in2).where("a === d").select("c, g");
+ }
+
+ @Test
+ public void testJoinWithAggregation() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ Table result = in1
+ .join(in2).where("a === d").select("g.count");
+
+ DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "6";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testJoinTablesFromDifferentEnvs() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+ Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ in1.join(in2).where("a === d").select("g.count");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
new file mode 100644
index 0000000..ba564bf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+ public PojoGroupingITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testPojoGrouping() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+ new Tuple3<>("A", 23.0, "Z"),
+ new Tuple3<>("A", 24.0, "Y"),
+ new Tuple3<>("B", 1.0, "Z"));
+
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ Table table = tableEnv
+ .fromDataSet(data, "groupMe, value, name")
+ .select("groupMe, value, name")
+ .where("groupMe != 'B'");
+
+ DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+ DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+ .sortGroup("value", Order.DESCENDING)
+ .first(1);
+
+ List<MyPojo> resultList = result.collect();
+ compareResultAsText(resultList, "A,24.0,Y");
+ }
+
+ public static class MyPojo implements Serializable {
+ private static final long serialVersionUID = 8741918940120107213L;
+
+ public String groupMe;
+ public double value;
+ public String name;
+
+ public MyPojo() {
+ // for serialization
+ }
+
+ public MyPojo(String groupMe, double value, String name) {
+ this.groupMe = groupMe;
+ this.value = value;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return groupMe + "," + value + "," + name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
new file mode 100644
index 0000000..7c9478a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class SelectITCase extends TableProgramsTestBase {
+
+ public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) {
+ super(mode, configMode);
+ }
+
+ @Test
+ public void testSimpleSelectAllWithAs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+ Table result = in
+ .select("a, b, c");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSimpleSelectWithNaming() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds);
+
+ Table result = in
+ .select("f0 as a, f1 as b")
+ .select("a, b");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSimpleSelectRenameAll() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds);
+
+ Table result = in
+ .select("f0 as a, f1 as b, f2 as c")
+ .select("a, b");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSelectInvalidField() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv.fromDataSet(ds, "a, b, c")
+ // Must fail. Field foo does not exist
+ .select("a + 1, foo + 2");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSelectAmbiguousFieldNames() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ tableEnv.fromDataSet(ds, "a, b, c")
+ // Must fail. Field foo does not exist
+ .select("a + 1 as foo, b + 2 as foo");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
new file mode 100644
index 0000000..e55bd22
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.codegen.CodeGenException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class StringExpressionsITCase extends MultipleProgramsTestBase {
+
+ public StringExpressionsITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testSubstring() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("AAAA", 2),
+ new Tuple2<>("BBBB", 1));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("a.substring(1, b)");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "AA\nB";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSubstringWithMaxEnd() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("a.substring(b)");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "CD\nBCD";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = CodeGenException.class)
+ public void testNonWorkingSubstring1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, Float>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 2.0f),
+ new Tuple2<>("ABCD", 1.0f));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ // Must fail. Second parameter of substring must be an Integer not a Double.
+ .select("a.substring(0, b)");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ resultSet.collect();
+ }
+
+ @Test(expected = CodeGenException.class)
+ public void testNonWorkingSubstring2() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, String>> ds = env.fromElements(
+ new Tuple2<>("ABCD", "a"),
+ new Tuple2<>("ABCD", "b"));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ // Must fail. First parameter of substring must be an Integer not a String.
+ .select("a.substring(b, 15)");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ resultSet.collect();
+ }
+
+ @Test(expected = CodeGenException.class)
+ public void testGeneratedCodeForStringComparison() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+ // Must fail because the comparison here is between Integer(column 'a') and (String 'Fred')
+ Table res = in.filter("a = 'Fred'" );
+ DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+ }
+
+ @Test(expected = CodeGenException.class)
+ public void testGeneratedCodeForIntegerEqualsComparison() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+ // Must fail because the comparison here is between String(column 'c') and (Integer 10)
+ Table res = in.filter("c = 10" );
+ DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+ }
+
+ @Test(expected = CodeGenException.class)
+ public void testGeneratedCodeForIntegerGreaterComparison() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
+ // Must fail because the comparison here is between String(column 'c') and (Integer 10)
+ Table res = in.filter("c > 10");
+ DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
+ }
+
+ @Test
+ public void testStringConcat() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("a + b + 42");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "ABCD342\nABCD242";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testStringConcat1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("42 + b + a");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "44ABCD\n45ABCD";
+ compareResultAsText(results, expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
new file mode 100644
index 0000000..a7805f8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnionITCase extends MultipleProgramsTestBase {
+
+ public UnionITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
+
+ Table selected = in1.unionAll(in2).select("c");
+
+ DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testUnionWithFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+
+ Table selected = in1.unionAll(in2).where("b < 2").select("c");
+
+ DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi\n" + "Hallo\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnionIncompatibleNumberOfFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+ // Must fail. Number of fields of union inputs do not match
+ in1.unionAll(in2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnionIncompatibleFieldsName() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, d");
+
+ // Must fail. Field names of union inputs do not match
+ in1.unionAll(in2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnionIncompatibleFieldTypes() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c");
+
+ // Must fail. Field types of union inputs do not match
+ in1.unionAll(in2);
+ }
+
+ @Test
+ public void testUnionWithAggregation() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+
+ Table selected = in1.unionAll(in2).select("c.count");
+
+ DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "18";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testUnionWithJoin() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds3 = CollectionDataSets.getSmall5TupleDataSet(env);
+
+ Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+ Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c");
+ Table in3 = tableEnv.fromDataSet(ds3, "a2, b2, d2, c2, e2").select("a2, b2, c2");
+
+ Table joinDs = in1.unionAll(in2).join(in3).where("a === a2").select("c, c2");
+ DataSet<Row> ds = tableEnv.toDataSet(joinDs, Row.class);
+ List<Row> results = ds.collect();
+
+ String expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
+ "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
+ "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
+ "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testUnionTablesFromDifferentEnvs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+ BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+ Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ in1.unionAll(in2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
deleted file mode 100644
index 31a5d53..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/BatchSQLITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.sql.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class BatchSQLITCase extends TableProgramsTestBase {
-
- public BatchSQLITCase(TestExecutionMode mode, TableConfigMode configMode) {
- super(mode, configMode);
- }
-
- @Test
- public void testSelectFromTable() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- Table in = tableEnv.fromDataSet(ds, "a,b,c");
- tableEnv.registerTable("T", in);
-
- String sqlQuery = "SELECT a, c FROM T";
- Table result = tableEnv.sql(sqlQuery);
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
- "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
- "7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
- "11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
- "14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
- "17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
- "20,Comment#14\n" + "21,Comment#15\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testFilterFromDataSet() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
-
- String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
- Table result = tableEnv.sql(sqlQuery);
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "2\n" + "3\n" + "4";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAggregation() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- tableEnv.registerDataSet("AggTable", ds, "x, y, z");
-
- String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
- Table result = tableEnv.sql(sqlQuery);
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "231,1,21,21,11";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testJoin() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
- tableEnv.registerDataSet("t1", ds1, "a, b, c");
- tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
-
- String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
- Table result = tableEnv.sql(sqlQuery);
-
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
- String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
- compareResultAsText(results, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
deleted file mode 100644
index 8e420f2..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/sql/test/StreamingSQLITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.sql.test;
-
-import org.apache.flink.api.java.table.StreamTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.api.java.table.test.utils.StreamTestData;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StreamingSQLITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void testSelect() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- StreamITCase.clear();
-
- DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
- Table in = tableEnv.fromDataStream(ds, "a,b,c");
- tableEnv.registerTable("MyTable", in);
-
- String sqlQuery = "SELECT STREAM * FROM MyTable";
- Table result = tableEnv.sql(sqlQuery);
-
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
- env.execute();
-
- List<String> expected = new ArrayList();
- expected.add("1,1,Hi");
- expected.add("2,2,Hello");
- expected.add("3,2,Hello world");
-
- StreamITCase.compareWithList(expected);
- }
-
- @Test
- public void testFilter() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- StreamITCase.clear();
-
- DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
- tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
-
- String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
- Table result = tableEnv.sql(sqlQuery);
-
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
- env.execute();
-
- List<String> expected = new ArrayList();
- expected.add("1,1,1");
- expected.add("2,2,2");
- expected.add("2,3,1");
- expected.add("3,4,2");
-
- StreamITCase.compareWithList(expected);
- }
-
- @Test
- public void testUnion() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- StreamITCase.clear();
-
- DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
- Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
- tableEnv.registerTable("T1", t1);
-
- DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
- tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
-
- String sqlQuery = "SELECT STREAM * FROM T1 " +
- "UNION ALL " +
- "(SELECT STREAM a, b, c FROM T2 WHERE a < 3)";
- Table result = tableEnv.sql(sqlQuery);
-
- DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink());
- env.execute();
-
- List<String> expected = new ArrayList();
- expected.add("1,1,Hi");
- expected.add("2,2,Hello");
- expected.add("3,2,Hello world");
- expected.add("1,1,Hallo");
- expected.add("2,2,Hallo Welt");
- expected.add("2,3,Hallo Welt wie");
-
- StreamITCase.compareWithList(expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
new file mode 100644
index 0000000..4161b1e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.stream.sql;
+
+import org.apache.flink.api.java.table.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.api.java.stream.utils.StreamTestData;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ public void testSelect() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
+ Table in = tableEnv.fromDataStream(ds, "a,b,c");
+ tableEnv.registerTable("MyTable", in);
+
+ String sqlQuery = "SELECT STREAM * FROM MyTable";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+ String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,1");
+ expected.add("2,2,2");
+ expected.add("2,3,1");
+ expected.add("3,4,2");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
+ Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+ tableEnv.registerTable("T1", t1);
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+ String sqlQuery = "SELECT STREAM * FROM T1 " +
+ "UNION ALL " +
+ "(SELECT STREAM a, b, c FROM T2 WHERE a < 3)";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink());
+ env.execute();
+
+ List<String> expected = new ArrayList();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+ expected.add("1,1,Hallo");
+ expected.add("2,2,Hallo Welt");
+ expected.add("2,3,Hallo Welt wie");
+
+ StreamITCase.compareWithList(expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
new file mode 100644
index 0000000..82ebf95
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/utils/StreamTestData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.stream.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StreamTestData {
+
+ public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+ data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+ data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+ data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+ data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+ data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+ data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+ data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+ data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+ data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+ data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+ data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+ data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+ data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+ data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+ data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+ return env.fromCollection(data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
deleted file mode 100644
index 35edb09..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.ExpressionParserException;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.plan.PlanGenException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.JavaTableExample.WC;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
-
- public AggregationsITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testAggregationTypes() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
- Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "231,1,21,21,11";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testAggregationOnNonExistingField() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- Table table =
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
- Table result =
- table.select("foo.avg");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testWorkingAggregationDataTypes() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
- env.fromElements(
- new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
- new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
- Table table = tableEnv.fromDataSet(input);
-
- Table result =
- table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1,1,1,1.5,1.5,2";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAggregationWithArithmetic() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSource<Tuple2<Float, String>> input =
- env.fromElements(
- new Tuple2<>(1f, "Hello"),
- new Tuple2<>(2f, "Ciao"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result =
- table.select("(f0 + 2).avg + 2, f1.count + 5");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "5.5,7";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testAggregationWithTwoCount() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSource<Tuple2<Float, String>> input =
- env.fromElements(
- new Tuple2<>(1f, "Hello"),
- new Tuple2<>(2f, "Ciao"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result =
- table.select("f0.count, f1.count");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2,2";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = PlanGenException.class)
- public void testNonWorkingDataTypes() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result =
- // Must fail. Cannot compute SUM aggregate on String field.
- table.select("f1.sum");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testNoNestedAggregation() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result =
- // Must fail. Aggregation on aggregation not allowed.
- table.select("f0.sum.sum");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testPojoAggregation() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- DataSet<WC> input = env.fromElements(
- new WC("Hello", 1),
- new WC("Ciao", 1),
- new WC("Hello", 1),
- new WC("Hola", 1),
- new WC("Hola", 1));
-
- Table table = tableEnv.fromDataSet(input);
-
- Table filtered = table
- .groupBy("word")
- .select("word.count as count, word")
- .filter("count = 2");
-
- List<String> result = tableEnv.toDataSet(filtered, WC.class)
- .map(new MapFunction<WC, String>() {
- public String map(WC value) throws Exception {
- return value.word;
- }
- }).collect();
- String expected = "Hello\n" + "Hola";
- compareResultAsText(result, expected);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
deleted file mode 100644
index 2a17c12..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends TableProgramsTestBase {
-
- public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){
- super(mode, configMode);
- }
-
- @Test
- public void testNumericAutocastInArithmetic() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input =
- env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result = table.select("f0 + 1, f1 +" +
- " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testNumericAutocastInComparison() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input =
- env.fromElements(
- new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d),
- new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d));
-
- Table table =
- tableEnv.fromDataSet(input, "a,b,c,d,e,f");
-
- Table result = table
- .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2,2,2,2,2.0,2.0";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testCasting() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
- env.fromElements(new Tuple4<>(1, 0.0, 1L, true));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result = table.select(
- // * -> String
- "f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," +
- // NUMERIC TYPE -> Boolean
- "f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
- // NUMERIC TYPE -> NUMERIC TYPE
- "f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," +
- // Boolean -> NUMERIC TYPE
- "f3.cast(DOUBLE)," +
- // identity casting
- "f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,0.0,1,true," +
- "true,false,true," +
- "1.0,0,1," +
- "1.0," +
- "1,0.0,1,true\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testCastFromString() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple3<String, String, String>> input =
- env.fromElements(new Tuple3<>("1", "true", "2.0"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result = table.select(
- "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "1,1,1,1,2.0,2.0,true\n";
- compareResultAsText(results, expected);
- }
-
- @Ignore // Date type not supported yet
- @Test
- public void testCastDateFromString() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple4<String, String, String, String>> input =
- env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result = table
- .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
- .select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
- "1970-01-17 17:47:53.775\n";
- compareResultAsText(results, expected);
- }
-
- @Ignore // Date type not supported yet
- @Test
- public void testCastDateToStringAndLong() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<String, String>> input =
- env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000"));
-
- Table table =
- tableEnv.fromDataSet(input);
-
- Table result = table
- .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
- .select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
- compareResultAsText(results, expected);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
deleted file mode 100644
index aca0c30..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DistinctITCase extends MultipleProgramsTestBase {
-
- public DistinctITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testDistinct() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
- Table table = tableEnv.fromDataSet(input, "a, b, c");
-
- Table distinct = table.select("b").distinct();
-
- DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
- List<Row> results = ds.collect();
- String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testDistinctAfterAggregate() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
- Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
- Table distinct = table.groupBy("a, e").select("e").distinct();
-
- DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
- List<Row> results = ds.collect();
- String expected = "1\n" + "2\n" + "3\n";
- compareResultAsText(results, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
deleted file mode 100644
index 4d6adfa..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table.test;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
-import static org.junit.Assert.fail;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class ExpressionsITCase extends TableProgramsTestBase {
-
- public ExpressionsITCase(TestExecutionMode mode, TableConfigMode configMode) {
- super(mode, configMode);
- }
-
- @Test
- public void testArithmetic() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<Integer, Integer>> input =
- env.fromElements(new Tuple2<>(5, 10));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b");
-
- Table result = table.select(
- "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "0,10,2,10,1,-5";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testLogic() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<Integer, Boolean>> input =
- env.fromElements(new Tuple2<>(5, true));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b");
-
- Table result = table.select(
- "b && true, b && false, b || false, !b");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "true,false,true,false";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testComparisons() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple3<Integer, Integer, Integer>> input =
- env.fromElements(new Tuple3<>(5, 5, 4));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table.select(
- "a > c, a >= b, a < c, a.isNull, a.isNotNull");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "true,true,false,false,true";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testNullLiteral() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<Integer, Integer>> input =
- env.fromElements(new Tuple2<>(1, 0));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b");
-
- Table result = table.select("a, b, Null(INT), Null(STRING) === ''");
-
- try {
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- if (!config().getNullCheck()) {
- fail("Exception expected if null check is disabled.");
- }
- List<Row> results = ds.collect();
- String expected = "1,0,null,null";
- compareResultAsText(results, expected);
- }
- catch (CodeGenException e) {
- if (config().getNullCheck()) {
- throw e;
- }
- }
- }
-
- @Test
- public void testEval() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<Integer, Boolean>> input =
- env.fromElements(new Tuple2<>(5, true));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b");
-
- Table result = table.select(
- "(b && true).eval('true', 'false')," +
- "false.eval('true', 'false')," +
- "true.eval(true.eval(true.eval(10, 4), 4), 4)");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "true,false,10";
- compareResultAsText(results, expected);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testEvalInvalidTypes() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple2<Integer, Boolean>> input =
- env.fromElements(new Tuple2<>(5, true));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b");
-
- Table result = table.select("(b && true).eval(5, 'false')");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "true,false,3,10";
- compareResultAsText(results, expected);
- }
-
- @Test
- public void testComplexExpression() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- DataSource<Tuple3<Integer, Integer, Integer>> input =
- env.fromElements(new Tuple3<>(5, 5, 4));
-
- Table table =
- tableEnv.fromDataSet(input, "a, b, c");
-
- Table result = table.select(
- "a.isNull().isNull," +
- "a.abs() + a.abs().abs().abs().abs()," +
- "a.cast(STRING) + a.cast(STRING)," +
- "CAST(ISNULL(b), INT)," +
- "ISNULL(CAST(b, INT).abs()) === false," +
- "((((true) === true) || false).cast(STRING) + 'X ').trim");
-
- DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
- List<Row> results = ds.collect();
- String expected = "false,10,55,0,true,trueX";
- compareResultAsText(results, expected);
- }
-
-}
-
|