spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mayur Bhosale (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-30528) DPP issues
Date Wed, 22 Jan 2020 09:00:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-30528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020803#comment-17020803
] 

Mayur Bhosale edited comment on SPARK-30528 at 1/22/20 8:59 AM:
----------------------------------------------------------------

Thanks for the explanation [~maryannxue]
 # Should DPP be turned off by default till the heuristics are improved or keep having it
turned on by default but don't do DPP when the column level stats are not available? Because
for some cases this can be really disastrous.
 # Can we use the bloom filter to store the pruning values (for non-Broadcast Hash Join)?
This will have multiple advantages -
 ## The size of the result returned to the driver would be way smaller
 ## Faster lookups compared to hashSet
 ## Reuse of the exchange will happen (because we won't be adding Aggregate on top)
 ## Duplicate subqueries because of multiple join conditions on partitioned columns will get
removed (cases like example 3 in the description above)

         This will require more thoughts though. Let me know if this sounds feasible
and useful, then I can get back with more details and can pick it up as well. 

       3. Yes, one of the subqueries selects `col1` and the other selects `col2`.
{code:java}
== Physical Plan ==                                                             
*(5) SortMergeJoin [partcol1#2L, partcol2#3], [col1#5L, col2#6], Inner
:- *(2) Sort [partcol1#2L ASC NULLS FIRST, partcol2#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(partcol1#2L, partcol2#3, 200), true, [id=#103]
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.partitionedtable[id#0L,name#1,partCol1#2L,partCol2#3]
Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/~/src/spark/bin/sp...,
PartitionFilters: [isnotnull(partCol2#3), isnotnull(partCol1#2L), dynamicpruningexpression(partCol1#2L
IN subquery#..., PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
:              :- Subquery subquery#19, [id=#49]
:              :  +- *(2) HashAggregate(keys=[col1#5L], functions=[])
:              :     +- Exchange hashpartitioning(col1#5L, 200), true, [id=#45]
:              :        +- *(1) HashAggregate(keys=[col1#5L], functions=[])
:              :           +- *(1) Project [col1#5L]
:              :              +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6))
AND isnotnull(col1#5L))
:              :                 +- *(1) ColumnarToRow
:              :                    +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/~/src/spark/bin/spark-wa..., PartitionFilters:
[], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema:
struct<id:bigint,col1:bigint,col2:string>
:              +- Subquery subquery#21, [id=#82]
:                 +- *(2) HashAggregate(keys=[col2#6], functions=[])
:                    +- Exchange hashpartitioning(col2#6, 200), true, [id=#78]
:                       +- *(1) HashAggregate(keys=[col2#6], functions=[])
:                          +- *(1) Project [col2#6]
:                             +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6))
AND isnotnull(col1#5L))
:                                +- *(1) ColumnarToRow
:                                   +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/~/src/spark/bin/spark-wa..., PartitionFilters:
[], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema:
struct<id:bigint,col1:bigint,col2:string>
+- *(4) Sort [col1#5L ASC NULLS FIRST, col2#6 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col1#5L, col2#6, 200), true, [id=#113]
      +- *(3) Project [id#4L, col1#5L, col2#6, name#7]
         +- *(3) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND
isnotnull(col1#5L))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6,name#7]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/~/src/spark/bin/spark-wa..., PartitionFilters:
[], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)], ReadSchema:
struct<id:bigint,col1:bigint,col2:string,name:string> 
{code}
 

           If we don't decide to go with removing Aggregate (not using BloomFilter),
should we combine such DPP subqueries into a                     single sub-query?
We can avoid duplicate computation this way.


was (Author: mayurb31):
Thanks for the explanation [~maryannxue]
 # Should DPP be turned off by default till the heuristics are improved or keep having it
turned on by default but don't do DPP when the column level stats are not available? Because
for some cases this can be really disastrous.
 # Can we use the bloom filter to store the pruning values (for non-Broadcast Hash Join)?
This will have multiple advantages -
 ## The size of the result returned to the driver would be way smaller
 ## Faster lookups compared to hashSet
 ## Reuse of the exchange will happen (because we won't be adding Aggregate on top)
 ## Duplicate subqueries because of multiple join conditions on partitioned columns will get
removed (cases like example 3 in the description above)

         This will require more thoughts though. Let me know if this sounds feasible
and useful, then I can get back with more details and can pick it up as well. 

       3. Yes, one of the subqueries selects `col1` and the other selects `col2`.
{code:java}
== Physical Plan ==                                                             
*(5) SortMergeJoin [partcol1#2L, partcol2#3], [col1#5L, col2#6], Inner
:- *(2) Sort [partcol1#2L ASC NULLS FIRST, partcol2#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(partcol1#2L, partcol2#3, 200), true, [id=#103]
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.partitionedtable[id#0L,name#1,partCol1#2L,partCol2#3]
Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/sp...,
PartitionFilters: [isnotnull(partCol2#3), isnotnull(partCol1#2L), dynamicpruningexpression(partCol1#2L
IN subquery#..., PushedFilters: [], ReadSchema: struct<id:bigint,name:string>
:              :- Subquery subquery#19, [id=#49]
:              :  +- *(2) HashAggregate(keys=[col1#5L], functions=[])
:              :     +- Exchange hashpartitioning(col1#5L, 200), true, [id=#45]
:              :        +- *(1) HashAggregate(keys=[col1#5L], functions=[])
:              :           +- *(1) Project [col1#5L]
:              :              +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6))
AND isnotnull(col1#5L))
:              :                 +- *(1) ColumnarToRow
:              :                    +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa...,
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)],
ReadSchema: struct<id:bigint,col1:bigint,col2:string>
:              +- Subquery subquery#21, [id=#82]
:                 +- *(2) HashAggregate(keys=[col2#6], functions=[])
:                    +- Exchange hashpartitioning(col2#6, 200), true, [id=#78]
:                       +- *(1) HashAggregate(keys=[col2#6], functions=[])
:                          +- *(1) Project [col2#6]
:                             +- *(1) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6))
AND isnotnull(col1#5L))
:                                +- *(1) ColumnarToRow
:                                   +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa...,
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)],
ReadSchema: struct<id:bigint,col1:bigint,col2:string>
+- *(4) Sort [col1#5L ASC NULLS FIRST, col2#6 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col1#5L, col2#6, 200), true, [id=#113]
      +- *(3) Project [id#4L, col1#5L, col2#6, name#7]
         +- *(3) Filter (((isnotnull(id#4L) AND (id#4L > 0)) AND isnotnull(col2#6)) AND
isnotnull(col1#5L))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.nonpartitionedtable[id#4L,col1#5L,col2#6,name#7]
Batched: true, DataFilters: [isnotnull(id#4L), (id#4L > 0), isnotnull(col2#6), isnotnull(col1#5L)],
Format: Parquet, Location: InMemoryFileIndex[file:/Users/saurabhc/src/spark_version_merge/spark300preview/spark/bin/spark-wa...,
PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0), IsNotNull(col2), IsNotNull(col1)],
ReadSchema: struct<id:bigint,col1:bigint,col2:string,name:string> 
{code}
 

           If we don't decide to go with removing Aggregate (not using BloomFilter),
should we combine such DPP subqueries into a                     single sub-query?
We can avoid duplicate computation this way.

> DPP issues
> ----------
>
>                 Key: SPARK-30528
>                 URL: https://issues.apache.org/jira/browse/SPARK-30528
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.0.0
>            Reporter: Mayur Bhosale
>            Priority: Major
>              Labels: performance
>         Attachments: cases.png, dup_subquery.png, plan.png
>
>
> In DPP, heuristics to decide if DPP is going to benefit relies on the sizes of the tables
in the right subtree of the join. This might not be a correct estimate especially when the
detailed column level stats are not available.
> {code:java}
>     // the pruning overhead is the total size in bytes of all scan relations
>     val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
>     filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
> {code}
> Also, DPP executes the entire right side of the join as a subquery because of which multiple
scans happen for the tables in the right subtree of the join. This can cause issues when join
is non-Broadcast Hash Join (BHJ) and reuse of the subquery result does not happen. Also, I
couldn’t figure out, why do the results from the subquery get re-used only for BHJ?
>  
> Consider a query,
> {code:java}
> SELECT * 
> FROM   store_sales_partitioned 
>        JOIN (SELECT * 
>              FROM   store_returns_partitioned, 
>                     date_dim 
>              WHERE  sr_returned_date_sk = d_date_sk) ret_date 
>          ON ss_sold_date_sk = d_date_sk 
> WHERE  d_fy_quarter_seq > 0 
> {code}
> DPP will kick-in for both the join. (Please check the image plan.png attached below for
the plan)
> Some of the observations -
>  * Based on heuristics, DPP would go ahead with pruning if the cost of scanning the tables
in the right sub-tree of the join is less than the benefit due to pruning. This is due to
the reason that multiple scans will be needed for an SMJ. But heuristics simply checks if
the benefits offset the cost of multiple scans and do not take into consideration other operations
like Join, etc in the right subtree which can be quite expensive. This issue will be particularly
prominent when detailed column level stats are not available. In the example above, a decision
that pruningHasBenefit was made on the basis of sizes of the tables store_returns_partitioned
and date_dim but did not take into consideration the join between them before the join happens
with the store_sales_partitioned table.
>  * Multiple scans are needed when the join is SMJ as the reuse of the exchanges does
not happen. This is because Aggregate gets added on top of the right subtree to be executed
as a subquery in order to prune only required columns. Here, scanning all the columns as the
right subtree of the join would, and reusing the same exchange might be more helpful as it
avoids duplicate scans.
> This was just a representative example, but in-general for cases such as in the image
cases.png below, DPP can cause performance issues.
>  
> Also, for the cases when there are multiple DPP compatible join conditions in the same
join, the entire right subtree of the join would be executed as a subquery that many times.
Consider an example,
> {code:java}
> SELECT * 
> FROM   partitionedtable 
>        JOIN nonpartitionedtable 
>          ON partcol1 = col1 
>             AND partcol2 = col2 
> WHERE  nonpartitionedtable.id > 0 
> {code}
> Here the right subtree of the join (scan of table nonpartitionedtable) would be executed
twice as a subquery, once each for the every join condition. These two subqueries should be
aggregated and executed only once as they are almost the same apart from the columns that
they prune. Check the image dup_subquery.png attached below for the details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message