spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liang-Chi Hsieh (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
Date Thu, 14 Jun 2018 02:58:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511924#comment-16511924
] 

Liang-Chi Hsieh commented on SPARK-24528:
-----------------------------------------

Btw, I think the complete and reproducible examples should be: 

{code}
spark.sql("set spark.sql.shuffle.partitions=3")
val N = 1000
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3,
"key").sortBy("key", "t1").saveAsTable("a1")
spark.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#156L], functions=[max(named_struct(t1, t1#157L, key, key#156L, t1,
t1#157L, t2, t2#158L))])
+- SortAggregate(key=[key#156L], functions=[partial_max(named_struct(t1, t1#157L, key, key#156L,
t1, t1#157L, t2, t2#158L))])
   +- *(1) FileScan parquet default.a1[key#156L,t1#157L,t2#158L] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/root/repos/spark-1/spark-warehouse/a1], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>, SelectedBucketsCount:
3 out of 3

{code}

{code}
spark.sql("set spark.sql.shuffle.partitions=2")
val N = 1000
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3,
"key").sortBy("key", "t1").saveAsTable("a1")
spark.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#126L], functions=[max(named_struct(t1, t1#127L, key, key#126L, t1,
t1#127L, t2, t2#128L))])
+- SortAggregate(key=[key#126L], functions=[partial_max(named_struct(t1, t1#127L, key, key#126L,
t1, t1#127L, t2, t2#128L))])
   +- *(1) Sort [key#126L ASC NULLS FIRST], false, 0
      +- *(1) FileScan parquet default.a1[key#126L,t1#127L,t2#128L] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/root/repos/spark-1/spark-warehouse/a1], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<key:bigint,t1:bigint,t2:bigint>, SelectedBucketsCount:
3 out of 3
{code}

> Missing optimization for Aggregations/Windowing on a bucketed table
> -------------------------------------------------------------------
>
>                 Key: SPARK-24528
>                 URL: https://issues.apache.org/jira/browse/SPARK-24528
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use case we
have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still "waste"
time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>     .repartition(col("key"))
>     .write
>   .mode(SaveMode.Overwrite)
>     .bucketBy(3, "key")
>     .sortBy("key", "t1")
>     .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1,
t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key,
key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet,
Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1,
t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key,
key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet,
Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
>     files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message