spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24066) Add new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions
Date Mon, 05 Nov 2018 10:34:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674930#comment-16674930
] 

Apache Spark commented on SPARK-24066:
--------------------------------------

User 'heary-cao' has created a pull request for this issue:
https://github.com/apache/spark/pull/22945

> Add new optimization rule to eliminate unnecessary sort by exchanged adjacent Window
expressions
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24066
>                 URL: https://issues.apache.org/jira/browse/SPARK-24066
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: caoxuewen
>            Priority: Major
>
> Currently, when two adjacent window functions have the same partition and the same intersection
of order, 
> There will be two sorted after shuffling, which is not necessary. This PR adds a new
optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions.
> For example:
> val df = Seq(
>       ("a", "p1", 10.0, 20.0, 30.0),
>       ("a", "p2", 20.0, 10.0, 40.0)).toDF("key", "value", "value1", "value2", "value3").select(
>         $"key",
>         sum("value1").over(Window.partitionBy("key").orderBy("value")),
>         max("value2").over(Window.partitionBy("key").orderBy("value", "value1")),
>         avg("value3").over(Window.partitionBy("key").orderBy("value", "value1",
"value2"))
>       ).queryExecution.executedPlan
>  
> Before  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST
unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST,
value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY
value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST
unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, value2#19, sum(value1) OVER (PARTITION
BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, avg(value3) OVER (PARTITION
BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
>       +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS
FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value
ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31],
[key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
>          +- *(3) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, value1#18
ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>             +- Window [sum(value1#18) windowspecdefinition(key#16, value#17
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS
sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29],
[key#16], [value#17 ASC NULLS FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS
FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18,
_2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8,
_5#9]
>  
> After  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST
unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST,
value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY
value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value
ASC NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, avg(value3) OVER (PARTITION BY key
ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31,
max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST
unspecifiedframe$())#30]
>       +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS
FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(),
currentrow$())) AS max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1
ASC NULLS FIRST unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC
NULLS FIRST]
>          +- *(3) Project [key#16, value1#18, value#17, value2#19, avg(value3)
OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC
NULLS FIRST unspecifiedframe$())#31]
>             +- Window [avg(value3#20) windowspecdefinition(key#16, value#17
ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value
ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31],
[key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS
FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18,
_2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8,
_5#9]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message