spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mathieu DESPRIEE (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-23220) broadcast hint not applied in a streaming left anti join
Date Sat, 27 Jan 2018 22:06:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342339#comment-16342339
] 

Mathieu DESPRIEE edited comment on SPARK-23220 at 1/27/18 10:05 PM:
--------------------------------------------------------------------

Found that the problem is related to the persist() operation. When the right-hand side of
the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to spark.sql.defaultSizeInBytes which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] Batched: false,
Format: JSON, Location: InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll check if it fixes
this bug as well.


was (Author: mathieude):
Found that the problem is related to the persist() operation. When the right-hand side of
the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to `spark.sql.defaultSizeInBytes` which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] Batched: false,
Format: JSON, Location: InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<app_id:string,event:string,event_id:string,timestamp:string,hostname:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll check if it fixes
this bug as well.

> broadcast hint not applied in a streaming left anti join
> --------------------------------------------------------
>
>                 Key: SPARK-23220
>                 URL: https://issues.apache.org/jira/browse/SPARK-23220
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.1
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>         Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, and a static
dataframe. This one is quite small (a few 100s of rows), but the query plan by default is
a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed the same
app with a FileSource pointing to our S3 storage with all archives. In that situation, the
first mini-batch is quite heavy (several 100'000s of input files), and the time spent in
sort-merge join is non-acceptable. Additionally it's highly skewed, so partition sizes are
completely uneven, and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>          +- Join LeftAnti, (hostname#3584 = hostname#190)
>             :- Project [app_id, ...
> <-- snip -->
>                                        +- StreamingExecutionRelation FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}],
[app_id
>  <--snip--> ... 62 more fields]
>             +- ResolvedHint isBroadcastable=true
>                +- Relation[hostname#190,descr#191] RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message