spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan
Date Fri, 20 Jan 2017 22:03:36 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 7bc3e9ba7 -> 482d361c3


[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan

## What changes were proposed in this pull request?

Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently
it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially
incorrect results.

## How was this patch tested?
New test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16662 from tdas/SPARK-19314.

(cherry picked from commit 552e5f08841828e55f5924f1686825626da8bcd0)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/482d361c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/482d361c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/482d361c

Branch: refs/heads/branch-2.1
Commit: 482d361c36b5d0e093f931e27701fb59488ad583
Parents: 7bc3e9b
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Fri Jan 20 14:04:51 2017 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Jan 20 14:05:05 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +-
 .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala  | 9 +++++++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/482d361c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c2666b2..f4d016c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
      * data.
      */
     def containsCompleteData(subplan: LogicalPlan): Boolean = {
-      val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+      val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
       // Either the subplan has no streaming source, or it has aggregation with Complete
mode
       !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/482d361c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 58e69f9..dcdb1ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -199,13 +199,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.intersect(_),
     streamStreamSupported = false)
 
-  // Sort: supported only on batch subplans and on aggregation + complete output mode
+  // Sort: supported only on batch subplans and after aggregation on streaming plan + complete
mode
   testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
   assertSupportedInStreamingPlan(
-    "sort - sort over aggregated data in Complete output mode",
+    "sort - sort after aggregation in Complete output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Complete)
   assertNotSupportedInStreamingPlan(
+    "sort - sort before aggregation in Complete output mode",
+    streamRelation.sortBy().groupBy()(Count("*")),
+    Complete,
+    Seq("sort", "aggregat", "complete"))
+  assertNotSupportedInStreamingPlan(
     "sort - sort over aggregated data in Update output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Update,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message