spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Zhuge (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-26576) Broadcast hint not applied to partitioned table
Date Thu, 10 Jan 2019 07:49:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

John Zhuge updated SPARK-26576:
-------------------------------
    Summary: Broadcast hint not applied to partitioned table  (was: Broadcast hint not applied
to partitioned Parquet table)

> Broadcast hint not applied to partitioned table
> -----------------------------------------------
>
>                 Key: SPARK-26576
>                 URL: https://issues.apache.org/jira/browse/SPARK-26576
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.2, 2.3.2, 2.4.0
>            Reporter: John Zhuge
>            Priority: Major
>
> Broadcast hint is not applied to partitioned Parquet table. Below "SortMergeJoin" is
chosen incorrectly and "ResolvedHit(broadcast)" is removed in Optimized Plan.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) PARTITIONED BY
(dateint INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df),
"dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_with_part`
> :  +- Relation[val#28,dateint#29] parquet
> +- ResolvedHint (broadcast)
>    +- SubqueryAlias `jzhuge`.`parquet_with_part`
>       +- Relation[val#32,dateint#33] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>    :- SubqueryAlias `jzhuge`.`parquet_with_part`
>    :  +- Relation[val#28,dateint#29] parquet
>    +- ResolvedHint (broadcast)
>       +- SubqueryAlias `jzhuge`.`parquet_with_part`
>          +- Relation[val#32,dateint#33] parquet
> == Optimized Logical Plan ==
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>    :- Project [val#28, dateint#29]
>    :  +- Filter isnotnull(dateint#29)
>    :     +- Relation[val#28,dateint#29] parquet
>    +- Project [val#32, dateint#33]
>       +- Filter isnotnull(dateint#33)
>          +- Relation[val#32,dateint#33] parquet
> == Physical Plan ==
> *(5) Project [dateint#29, val#28, val#32]
> +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
>    :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
>    :  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target
post-shuffle partition size: 67108864]
>    :     +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] Batched:
true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters:
[isnotnull(dateint#29)], PushedFilters: [], ReadSchema: struct<val:string>
>    +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
>       +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 55629191) hashpartitioning(dateint#29,
500), coordinator[target post-shuffle partition size: 67108864]
> {noformat}
> Broadcast hint is applied to Parquet table without partition. Below "BroadcastHashJoin"
is chosen as expected.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint INT) STORED
AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => df.join(broadcast(df),
"dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_no_part`
> :  +- Relation[val#44,dateint#45] parquet
> +- ResolvedHint (broadcast)
>    +- SubqueryAlias `jzhuge`.`parquet_no_part`
>       +- Relation[val#50,dateint#51] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>    :- SubqueryAlias `jzhuge`.`parquet_no_part`
>    :  +- Relation[val#44,dateint#45] parquet
>    +- ResolvedHint (broadcast)
>       +- SubqueryAlias `jzhuge`.`parquet_no_part`
>          +- Relation[val#50,dateint#51] parquet
> == Optimized Logical Plan ==
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>    :- Filter isnotnull(dateint#45)
>    :  +- Relation[val#44,dateint#45] parquet
>    +- ResolvedHint (broadcast)
>       +- Filter isnotnull(dateint#51)
>          +- Relation[val#50,dateint#51] parquet
> == Physical Plan ==
> *(2) Project [dateint#45, val#44, val#50]
> +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
>    :- *(2) Project [val#44, dateint#45]
>    :  +- *(2) Filter isnotnull(dateint#45)
>    :     +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] Batched:
true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters:
[IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
>    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as
bigint)))
>       +- *(1) Project [val#50, dateint#51]
>          +- *(1) Filter isnotnull(dateint#51)
>             +- *(1) FileScan parquet jzhuge.parquet_no_part[val#50,dateint#51] Batched:
true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters:
[IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
> {noformat}
> Observed similar issue with partitioned Orc table. SequenceFile is fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message