spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fernando Pereira (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-22276) Unnecessary repartitioning
Date Mon, 16 Oct 2017 10:15:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-22276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Fernando Pereira updated SPARK-22276:
-------------------------------------
    Description: 
When a dataframe is sorted it is partitioned with a RangePartitioner.
If later we aggregate by the exact same fields over which sort was applied there is a new
(apparently useless) Exchange repartitioning by a HashPartitioner.
In my use case the groupBy exchange is still very costly as the aggregate function won't reduce
the data volume.

Is there any reason why groupBy always shuffles data, or could this be improved? 
Is there currently a way to workaround for the moment, without going to mapPartitions?

Example

{code}
nrn_vals.printSchema()
(nrn_vals
 .sort("post_gid")
 .groupBy("post_gid")
 .agg(F.collect_list("pre_gid").alias("pre_gids"))
 ).explain()
{code}
Outputs the following
{code}
root
 |-- pre_gid: integer (nullable = true)
 |-- post_gid: integer (nullable = true)
 |-- floatvec: array (nullable = false)
 |    |-- element: float (containsNull = true)

== Physical Plan ==
ObjectHashAggregate(keys=[post_gid#1386], functions=[collect_list(pre_gid#1385, 0, 0)])
+- Exchange hashpartitioning(post_gid#1386, 1)
   +- ObjectHashAggregate(keys=[post_gid#1386], functions=[partial_collect_list(pre_gid#1385,
0, 0)])
      +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
         +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
            +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pre_gid:int,post_gid:int>

{code}


  was:
When a dataframe is sorted it is partitioned with a RangePartitioner.
If later we aggregate by the exact same fields over which sort was applied there is a new
(apparently useless) Exchange repartitioning by a HashPartitioner.
In my use case the groupBy exchange is still very costly as the aggregate function won't reduce
the data volume.

Is there any reason why groupBy always shuffles data, or could this be improved? 
Is there currently a way to workaround for the moment, without going to mapPartitions?


> Unnecessary repartitioning
> --------------------------
>
>                 Key: SPARK-22276
>                 URL: https://issues.apache.org/jira/browse/SPARK-22276
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0
>            Reporter: Fernando Pereira
>
> When a dataframe is sorted it is partitioned with a RangePartitioner.
> If later we aggregate by the exact same fields over which sort was applied there is a
new (apparently useless) Exchange repartitioning by a HashPartitioner.
> In my use case the groupBy exchange is still very costly as the aggregate function won't
reduce the data volume.
> Is there any reason why groupBy always shuffles data, or could this be improved? 
> Is there currently a way to workaround for the moment, without going to mapPartitions?
> Example
> {code}
> nrn_vals.printSchema()
> (nrn_vals
>  .sort("post_gid")
>  .groupBy("post_gid")
>  .agg(F.collect_list("pre_gid").alias("pre_gids"))
>  ).explain()
> {code}
> Outputs the following
> {code}
> root
>  |-- pre_gid: integer (nullable = true)
>  |-- post_gid: integer (nullable = true)
>  |-- floatvec: array (nullable = false)
>  |    |-- element: float (containsNull = true)
> == Physical Plan ==
> ObjectHashAggregate(keys=[post_gid#1386], functions=[collect_list(pre_gid#1385, 0, 0)])
> +- Exchange hashpartitioning(post_gid#1386, 1)
>    +- ObjectHashAggregate(keys=[post_gid#1386], functions=[partial_collect_list(pre_gid#1385,
0, 0)])
>       +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
>          +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
>             +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pre_gid:int,post_gid:int>
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message