spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tejas Patil (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19122) Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and sorting order
Date Fri, 12 May 2017 15:12:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008266#comment-16008266
] 

Tejas Patil commented on SPARK-19122:
-------------------------------------

[~cloud_fan]: 
- The test case in the [associated PR (#16985)|https://github.com/apache/spark/pull/16985]
fails without the fix 
- I am able to repro this issue over master branch. Can you share exact steps that you used
to repro ? I am guessing that `spark.sql.autoBroadcastJoinThreshold` needs to be overridden
otherwise it wont pick sort merge join. Here are my exact steps to repro the example in the
jira description:

{noformat}
$ git log
commit 92ea7fd7b6cd4641b2f02b97105835029ddadc5f
Author: Takeshi Yamamuro <yamamuro@apache.org>
Date:   Fri May 12 20:48:30 2017 +0800

build/sbt -Pyarn -Phadoop-2.4 -Phive package assembly/package
export SPARK_PREPEND_CLASSES=true
SPARK_LOCAL_IP=127.0.0.1 ./bin/spark-shell
{noformat}

In spark shell:
{noformat}
import org.apache.spark.sql._
val hc = SparkSession.builder.master("local").enableHiveSupport.getOrCreate()
hc.sql(" DROP TABLE table1 ")
hc.sql(" DROP TABLE table2 ")
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)

hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table1")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table2")


scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
   :- 'SubqueryAlias a
   :  +- 'UnresolvedRelation `table1`
   +- 'SubqueryAlias b
      +- 'UnresolvedRelation `table2`

== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#86, j#87, k#88, i#89, j#90, k#91]
+- Join Inner, ((j#87 = j#90) && (k#88 = k#91))
   :- SubqueryAlias a
   :  +- SubqueryAlias table1
   :     +- Relation[i#86,j#87,k#88] orc
   +- SubqueryAlias b
      +- SubqueryAlias table2
         +- Relation[i#89,j#90,k#91] orc

== Optimized Logical Plan ==
Join Inner, ((j#87 = j#90) && (k#88 = k#91))
:- Filter (isnotnull(j#87) && isnotnull(k#88))
:  +- Relation[i#86,j#87,k#88] orc
+- Filter (isnotnull(j#90) && isnotnull(k#91))
   +- Relation[i#89,j#90,k#91] orc

== Physical Plan ==
*SortMergeJoin [j#87, k#88], [j#90, k#91], Inner
:- *Project [i#86, j#87, k#88]
:  +- *Filter (isnotnull(j#87) && isnotnull(k#88))
:     +- *FileScan orc default.table1[i#86,j#87,k#88] Batched: false, Format: ORC, Location:
InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/apache-hive-1.2.1-bin/warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#89, j#90, k#91]
   +- *Filter (isnotnull(j#90) && isnotnull(k#91))
      +- *FileScan orc default.table2[i#89,j#90,k#91] Batched: false, Format: ORC, Location:
InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/apache-hive-1.2.1-bin/warehouse/table2],
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>



scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.k = 'b.k) && ('a.j = 'b.j))
   :- 'SubqueryAlias a
   :  +- 'UnresolvedRelation `table1`
   +- 'SubqueryAlias b
      +- 'UnresolvedRelation `table2`

== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#106, j#107, k#108, i#109, j#110, k#111]
+- Join Inner, ((k#108 = k#111) && (j#107 = j#110))
   :- SubqueryAlias a
   :  +- SubqueryAlias table1
   :     +- Relation[i#106,j#107,k#108] orc
   +- SubqueryAlias b
      +- SubqueryAlias table2
         +- Relation[i#109,j#110,k#111] orc

== Optimized Logical Plan ==
Join Inner, ((k#108 = k#111) && (j#107 = j#110))
:- Filter (isnotnull(j#107) && isnotnull(k#108))
:  +- Relation[i#106,j#107,k#108] orc
+- Filter (isnotnull(k#111) && isnotnull(j#110))
   +- Relation[i#109,j#110,k#111] orc

== Physical Plan ==
*SortMergeJoin [k#108, j#107], [k#111, j#110], Inner
:- *Sort [k#108 ASC NULLS FIRST, j#107 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(k#108, j#107, 200)
:     +- *Project [i#106, j#107, k#108]
:        +- *Filter (isnotnull(j#107) && isnotnull(k#108))
:           +- *FileScan orc default.table1[i#106,j#107,k#108] Batched: false, Format: ORC,
Location: InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/apache-hive-1.2.1-bin/warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [k#111 ASC NULLS FIRST, j#110 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(k#111, j#110, 200)
      +- *Project [i#109, j#110, k#111]
         +- *Filter (isnotnull(k#111) && isnotnull(j#110))
            +- *FileScan orc default.table2[i#109,j#110,k#111] Batched: false, Format: ORC,
Location: InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/apache-hive-1.2.1-bin/warehouse/table2],
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>

{noformat}

> Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and
sorting order
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19122
>                 URL: https://issues.apache.org/jira/browse/SPARK-19122
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2, 2.1.0
>            Reporter: Tejas Patil
>
> `table1` and `table2` are sorted and bucketed on columns `j` and `k` (in respective order)
> This is how they are generated:
> {code}
> val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table1")
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table2")
> {code}
> Now, if join predicates are specified in query in *same* order as bucketing and sort
order, there is no shuffle and sort.
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND a.k=b.k").explain(true)
> == Physical Plan ==
> *SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
> :- *Project [i#60, j#61, k#62]
> :  +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> :     +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC, Location:
InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)],
ReadSchema: struct<i:int,j:int,k:string>
> +- *Project [i#99, j#100, k#101]
>    +- *Filter (isnotnull(j#100) && isnotnull(k#101))
>       +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format: ORC,
Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j),
IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
> {code}
> The same query with join predicates in *different* order from bucketing and sort order
leads to extra shuffle and sort being introduced
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j ").explain(true)
> == Physical Plan ==
> *SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
> :- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(k#62, j#61, 200)
> :     +- *Project [i#60, j#61, k#62]
> :        +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> :           +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: ORC,
Location: InMemoryFileIndex[file:/table1], PartitionFilters: [], PushedFilters: [IsNotNull(k),
IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
> +- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(k#101, j#100, 200)
>       +- *Project [i#99, j#100, k#101]
>          +- *Filter (isnotnull(j#100) && isnotnull(k#101))
>             +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, Format:
ORC, Location: InMemoryFileIndex[file:/table2], PartitionFilters: [], PushedFilters: [IsNotNull(j),
IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message