flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinch...@apache.org
Subject flink git commit: [FLINK-7126] [table] Support Distinct for Stream SQL and Table API
Date Thu, 13 Jul 2017 05:46:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master 59463278a -> c8ca31aba


[FLINK-7126] [table] Support Distinct for Stream SQL and Table API

This closes #4279


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8ca31ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8ca31ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8ca31ab

Branch: refs/heads/master
Commit: c8ca31aba083304117ed527c5c80cb9ede2b81bd
Parents: 5946327
Author: Jark Wu <jark@apache.org>
Authored: Fri Jul 7 16:10:45 2017 +0800
Committer: Jincheng Sun <jincheng@apache.org>
Committed: Thu Jul 13 11:56:09 2017 +0800

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  4 +-
 docs/dev/table/tableApi.md                      |  4 +-
 .../flink/table/plan/logical/operators.scala    |  7 ----
 .../api/scala/stream/sql/AggregationsTest.scala | 40 +++++++++++++++++++-
 .../stream/table/GroupAggregationsITCase.scala  | 37 ++++++++++++++++++
 .../scala/stream/table/UnsupportedOpsTest.scala |  7 ----
 6 files changed, 81 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 11b7b0e..3b175a9 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -284,12 +284,14 @@ FROM Orders
     <tr>
       <td>
         <strong>Distinct</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span> <br>
+        <span class="label label-info">Result Updating</span>
       </td>
       <td>
 {% highlight sql %}
 SELECT DISTINCT users FROM Orders
 {% endhighlight %}
+       <p><b>Note:</b> For streaming queries the required state to compute
the query result might grow infinitely depending on the number of distinct fields. Please
provide a query configuration with valid retention interval to prevent excessive state size.
See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 99940dc..fe057cc 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -373,7 +373,8 @@ Table result = orders
     <tr>
       <td>
         <strong>Distinct</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span> <br>
+        <span class="label label-info">Result Updating</span>
       </td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
@@ -381,6 +382,7 @@ Table result = orders
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
 {% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute
the query result might grow infinitely depending on the number of distinct fields. Please
provide a query configuration with valid retention interval to prevent excessive state size.
See <a href="streaming.html">Streaming Concepts</a> for details.</p>
       </td>
     </tr>
   </tbody>

http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 4077d36..795a506 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -128,13 +128,6 @@ case class Distinct(child: LogicalNode) extends UnaryNode {
     child.construct(relBuilder)
     relBuilder.distinct()
   }
-
-  override def validate(tableEnv: TableEnvironment): LogicalNode = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      failValidation(s"Distinct on stream tables is currently not supported.")
-    }
-    this
-  }
 }
 
 case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala
index b6a0185..4ab5cf6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala
@@ -24,13 +24,13 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
 import org.apache.flink.table.expressions.AggFunctionCall
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.apache.flink.types.Row
-import org.junit.Test
 import org.junit.Assert.{assertEquals, assertTrue}
-
+import org.junit.{Ignore, Test}
 
 class AggregationsTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
@@ -49,6 +49,42 @@ class AggregationsTest extends TableTestBase {
   }
 
   @Test
+  def testDistinct(): Unit = {
+    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        streamTableNode(0),
+        term("groupBy", "a, b, c"),
+        term("select", "a, b, c")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "a")
+        ),
+        term("groupBy", "a"),
+        term("select", "a")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+
+  @Test
   def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
     streamUtil.addFunction("udag", new MyAgg)
     val call = streamUtil

http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
index 9da2c44..4a3524b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -38,6 +38,43 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
   private val queryConfig = new StreamQueryConfig()
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 
+
+  @Test
+  def testDistinct(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('b).distinct()
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .groupBy('e).select('e, 'a.count).distinct()
+
+    val results = t.toRetractStream[Row](queryConfig)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList("1,5", "2,7", "3,3")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
   @Test
   def testNonKeyedGroupAggregate(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/c8ca31ab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
index c72249a..0e33d8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -28,13 +28,6 @@ import org.junit.Test
 class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
 
   @Test(expected = classOf[ValidationException])
-  def testDistinct(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testSort(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)


Mime
View raw message