spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces unnecessary shuffle
Date Sun, 15 Oct 2017 06:08:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205038#comment-16205038
] 

Apache Spark commented on SPARK-22223:
--------------------------------------

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/19501

> ObjectHashAggregate introduces unnecessary shuffle
> --------------------------------------------------
>
>                 Key: SPARK-22223
>                 URL: https://issues.apache.org/jira/browse/SPARK-22223
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
>            Reporter: Michele Costantino Soccio
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of unnecessary shuffle
when the partitions at previous step are based on looser criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
>     .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
>   .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
>   .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
>   .agg(collect_list("Good").as("Goods"))
>   .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
>   .groupBy("Country", "CustomerID")
>   .agg(collect_list("Invoice").as("Invoices"))
>   .withColumn("Customer", expr("(CustomerID, Invoices)"))
>   .groupBy("Country")
>   .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317)
AS collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
>    +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299,
0, 0) AS buf#317])
>       +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294)
AS Customer#299]
>          +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge
buf#319) AS collect_list(Invoice#278, 0, 0)#293])
>             +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
>                +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278,
0, 0) AS buf#319])
>                   +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7,
InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
>                      +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7,
InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249,
0, 0)#270])
>                         +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7,
InvoiceDate#11, 200)
>                            +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7,
InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>                               +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13,
Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10,
Description, Description#9) AS Good#249]
>                                  +- Exchange hashpartitioning(Country#14, 200)
>                                     +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS
collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0)
AS buf#3834])
>    +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294)
AS Customer#299]
>       +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge
buf#319) AS collect_list(Invoice#278, 0, 0)#293])
>          +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278,
0, 0) AS buf#319])
>             +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7,
InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
>                +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11],
functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
>                   +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11],
functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>                      +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST,
InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
>                         +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14,
named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description,
Description#9) AS Good#249]
>                            +- Exchange hashpartitioning(Country#14, 200)
>                               +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> In this example, a quick run on DataBricks Notebook showed that by manually disabling
the {{ObjectHashAggregate}} one gets around 16s execution time versus the 25s needed when
{{ObjectHashAggregate}} is enabled.
> The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with SPARK-17949.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message