spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Henry Robinson (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
Date Fri, 03 Nov 2017 15:36:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237778#comment-16237778
] 

Henry Robinson commented on SPARK-22211:
----------------------------------------

[~smilegator] - sounds good! What will your approach be? I wasn't able to see a safe way to
push the limit through the join without either a more invasive rewrite or restricting the
set of join operators for FOJ. 

> LimitPushDown optimization for FullOuterJoin generates wrong results
> --------------------------------------------------------------------
>
>                 Key: SPARK-22211
>                 URL: https://issues.apache.org/jira/browse/SPARK-22211
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>         Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>            Reporter: Benyi Wang
>            Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate
a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected,
but at right side we have 100K rows including 999, the result will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/100000th chance to be selected by
CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 100000).toDF("id")
> val dr = shuffle(1 to 100000).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>    :- *Sort [id#10 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#10, 200)
>    :     +- *LocalLimit 1
>    :        +- LocalTableScan [id#10]
>    +- *Sort [id#16 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(id#16, 200)
>          +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> +----+---+
> |id  |id |
> +----+---+
> |null|148|
> +----+---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message