flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [20/50] [abbrv] flink git commit: [FLINK-3564] [table] Implement distinct() for Table API
Date Fri, 18 Mar 2016 13:48:14 GMT
[FLINK-3564] [table] Implement distinct() for Table API

This closes #1754


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cab28d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cab28d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cab28d6

Branch: refs/heads/master
Commit: 7cab28d62b280a706c388fea145cb2d81a7fea40
Parents: 49699f0
Author: twalthr <twalthr@apache.org>
Authored: Wed Mar 2 13:19:38 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:50 2016 +0100

----------------------------------------------------------------------
 docs/apis/batch/libs/table.md                   | 23 ++++++
 .../org/apache/flink/api/table/table.scala      | 23 ++++--
 .../api/java/table/test/DistinctITCase.java     | 75 ++++++++++++++++++++
 .../api/scala/table/test/DistinctITCase.scala   | 60 ++++++++++++++++
 .../flink/api/scala/table/test/JoinITCase.scala |  3 +-
 5 files changed, 178 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cab28d6/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index ce4d016..87006c7 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -238,6 +238,17 @@ Table result = left.union(right);
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Distinct</strong></td>
+      <td>
+        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.distinct();
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -340,6 +351,18 @@ val result = left.union(right);
 {% endhighlight %}
       </td>
     </tr>
+
+    <tr>
+      <td><strong>Distinct</strong></td>
+      <td>
+        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p>
+{% highlight scala %}
+val in = ds.as('a, 'b, 'c);
+val result = in.distinct();
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/7cab28d6/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 4efb2a3..0763f34 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -271,6 +271,21 @@ class Table(
   }
 
   /**
+   * Removes duplicate values and returns only distinct (different) values.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.select("key, value").distinct()
+   * }}}
+   */
+  def distinct(): Table = {
+    relBuilder.push(relNode)
+    relBuilder.distinct()
+    new Table(relBuilder.build(), relBuilder)
+  }
+
+  /**
    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    * operations must not overlap, use [[as]] to rename fields if necessary. You can use
    * where and select clauses after a join to further specify the behaviour of the join.
@@ -299,7 +314,7 @@ class Table(
   }
 
   /**
-   * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
+   * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
    * must fully overlap.
    *
    * Example:
@@ -334,7 +349,7 @@ class Table(
 
   /**
    * Get the process of the sql parsing, print AST and physical execution plan.The AST
-   * show the structure of the supplied statement. The execution plan shows how the table

+   * show the structure of the supplied statement. The execution plan shows how the table
    * referenced by the statement will be scanned.
    */
   def explain(extended: Boolean): String = {
@@ -369,7 +384,7 @@ class GroupedTable(
     * Example:
     *
     * {{{
-    *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
+    *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(1, 10))
     * }}}
     */
   def select(fields: Expression*): Table = {
@@ -412,7 +427,7 @@ class GroupedTable(
     * Example:
     *
     * {{{
-    *   in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
+    *   in.select("key, value.avg + " The average" as average, other.substring(1, 10)")
     * }}}
     */
   def select(fields: String): Table = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7cab28d6/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
new file mode 100644
index 0000000..b7d689d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+	public DistinctITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testDistinct() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table distinct = table.select("b").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testDistinctAfterAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+		Table distinct = table.groupBy("a, e").select("e").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n";
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cab28d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
new file mode 100644
index 0000000..770819a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/DistinctITCase.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testDistinct(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val distinct = ds.select('b).distinct()
+
+    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    val results = distinct.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env).as('a, 'b, 'c, 'd, 'e)
+
+    val distinct = ds.groupBy('a, 'e).select('e).distinct()
+
+    val expected = "1\n" + "2\n" + "3\n"
+    val results = distinct.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cab28d6/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 7ce77d1..70beaf0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -30,7 +30,6 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.table.TableException
 
 @RunWith(classOf[Parameterized])
 class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
@@ -72,7 +71,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c,
'g)
 
     val expected = "Hello world, how are you?,Hallo Welt wie\n" +
-      "I am fine.,Hallo Welt wie\n"  
+      "I am fine.,Hallo Welt wie\n"
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }


Mime
View raw message