spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "yucai (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24343) Avoid shuffle for the bucketed table when shuffle.partition > bucket number
Date Tue, 22 May 2018 09:19:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

yucai updated SPARK-24343:
--------------------------
    Description: 
When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as per
the shuffle.partition, can we avoid this?

See below example:
{code:java}
CREATE TABLE dev
USING PARQUET
AS SELECT ws_item_sk, i_item_sk
FROM web_sales_bucketed
JOIN item ON ws_item_sk = i_item_sk;{code}
web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10

Currently, both tables are shuffled into 10 partitions.
{code:java}
Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists,
[ws_item_sk#2, i_item_sk#6]
+- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
:- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ws_item_sk#2, 10)
: +- *(1) Project [ws_item_sk#2]
: +- *(1) Filter isnotnull(ws_item_sk#2)
: +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
+- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_sk#6, 10)
+- *(3) Project [i_item_sk#6]
+- *(3) Filter isnotnull(i_item_sk#6)
+- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...
{code}
A better plan should avoid the shuffle in the bucket table.
{code:java}
Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists,
[ws_item_sk#2, i_item_sk#6]
+- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
:- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
: +- *(1) Project [ws_item_sk#2]
: +- *(1) Filter isnotnull(ws_item_sk#2)
: +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
+- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_sk#6, 4)
+- *(2) Project [i_item_sk#6]
+- *(2) Filter isnotnull(i_item_sk#6)
+- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...{code}
This problem could be worse if we enable the adaptive execution, because it usually prefers
a big shuffle.parititon.

 

  was:
When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as per
the shuffle.partition, can we avoid this?

See below example:
{code:java}
CREATE TABLE dev
USING PARQUET
AS SELECT ws_item_sk, i_item_sk
FROM web_sales_bucketed
JOIN item ON ws_item_sk = i_item_sk;{code}
web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10

Currently, both tables are shuffled into 10 partitions.
{code:java}
Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists,
[ws_item_sk#2, i_item_sk#6]
+- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
:- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ws_item_sk#2, 10)
: +- *(1) Project [ws_item_sk#2]
: +- *(1) Filter isnotnull(ws_item_sk#2)
: +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
+- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_sk#6, 10)
+- *(3) Project [i_item_sk#6]
+- *(3) Filter isnotnull(i_item_sk#6)
+- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...
{code}
A better plan should avoid the shuffle in the bucket table.
{code:java}
Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists,
[ws_item_sk#2, i_item_sk#6]
+- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
:- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
: +- *(1) Project [ws_item_sk#2]
: +- *(1) Filter isnotnull(ws_item_sk#2)
: +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
+- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_sk#6, 4)
+- *(2) Project [i_item_sk#6]
+- *(2) Filter isnotnull(i_item_sk#6)
+- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...{code}
 

 

This problem could be worse if we enable the adaptive execution, because it usually prefers
a big shuffle.parititon.

 


> Avoid shuffle for the bucketed table when shuffle.partition > bucket number
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-24343
>                 URL: https://issues.apache.org/jira/browse/SPARK-24343
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: yucai
>            Priority: Major
>
> When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as
per the shuffle.partition, can we avoid this?
> See below example:
> {code:java}
> CREATE TABLE dev
> USING PARQUET
> AS SELECT ws_item_sk, i_item_sk
> FROM web_sales_bucketed
> JOIN item ON ws_item_sk = i_item_sk;{code}
> web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10
> Currently, both tables are shuffled into 10 partitions.
> {code:java}
> Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`,
ErrorIfExists, [ws_item_sk#2, i_item_sk#6]
> +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
> :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(ws_item_sk#2, 10)
> : +- *(1) Project [ws_item_sk#2]
> : +- *(1) Filter isnotnull(ws_item_sk#2)
> : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
> +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(i_item_sk#6, 10)
> +- *(3) Project [i_item_sk#6]
> +- *(3) Filter isnotnull(i_item_sk#6)
> +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...
> {code}
> A better plan should avoid the shuffle in the bucket table.
> {code:java}
> Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`,
ErrorIfExists, [ws_item_sk#2, i_item_sk#6]
> +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner
> :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0
> : +- *(1) Project [ws_item_sk#2]
> : +- *(1) Filter isnotnull(ws_item_sk#2)
> : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format:
Parquet, Location:...
> +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(i_item_sk#6, 4)
> +- *(2) Project [i_item_sk#6]
> +- *(2) Filter isnotnull(i_item_sk#6)
> +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...{code}
> This problem could be worse if we enable the adaptive execution, because it usually prefers
a big shuffle.parititon.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message