Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A2FC5200D2C for ; Sun, 15 Oct 2017 08:08:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A170E160BE6; Sun, 15 Oct 2017 06:08:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BDB121609D5 for ; Sun, 15 Oct 2017 08:08:04 +0200 (CEST) Received: (qmail 96533 invoked by uid 500); 15 Oct 2017 06:08:03 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 96524 invoked by uid 99); 15 Oct 2017 06:08:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Oct 2017 06:08:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EF86F1A152F for ; Sun, 15 Oct 2017 06:08:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Z7Tt0dHyXqBe for ; Sun, 15 Oct 2017 06:08:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 1770F5FAF3 for ; Sun, 15 Oct 2017 06:08:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 7C609E045B for ; Sun, 15 Oct 2017 06:08:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 31A5424360 for ; Sun, 15 Oct 2017 06:08:00 +0000 (UTC) Date: Sun, 15 Oct 2017 06:08:00 +0000 (UTC) From: "Apache Spark (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces unnecessary shuffle MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 15 Oct 2017 06:08:05 -0000 [ https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205038#comment-16205038 ] Apache Spark commented on SPARK-22223: -------------------------------------- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/19501 > ObjectHashAggregate introduces unnecessary shuffle > -------------------------------------------------- > > Key: SPARK-22223 > URL: https://issues.apache.org/jira/browse/SPARK-22223 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and following. > {{spark.sql.execution.useObjectHashAggregateExec = true}} > Reporter: Michele Costantino Soccio > > Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current {{groupBy}}. > For example: > {code:java} > //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data > //Read the data and repartitions by "Country" > val retailDF = spark.sql("Select * from online_retail") > .repartition(col("Country")) > //Group the data and collect. > val aggregatedDF = retailDF > .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)")) > .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate") > .agg(collect_list("Good").as("Goods")) > .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)")) > .groupBy("Country", "CustomerID") > .agg(collect_list("Invoice").as("Invoices")) > .withColumn("Customer", expr("(CustomerID, Invoices)")) > .groupBy("Country") > .agg(collect_list("Customer").as("Customers")) > {code} > Without disabling the {{ObjectHashAggregate}} one gets the following physical plan: > {noformat} > == Physical Plan == > ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310]) > +- Exchange hashpartitioning(Country#14, 200) > +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317]) > +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299] > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293]) > +- Exchange hashpartitioning(Country#14, CustomerID#13, 200) > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) > +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278] > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270]) > +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200) > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321]) > +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] > +- Exchange hashpartitioning(Country#14, 200) > +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} > With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more efficient: > {noformat} > == Physical Plan == > SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310]) > +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834]) > +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299] > +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293]) > +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) > +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278] > +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270]) > +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321]) > +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0 > +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] > +- Exchange hashpartitioning(Country#14, 200) > +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} > In this example, a quick run on DataBricks Notebook showed that by manually disabling the {{ObjectHashAggregate}} one gets around 16s execution time versus the 25s needed when {{ObjectHashAggregate}} is enabled. > The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with SPARK-17949. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org