spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Henry Robinson (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
Date Mon, 30 Oct 2017 22:00:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225816#comment-16225816
] 

Henry Robinson edited comment on SPARK-22211 at 10/30/17 9:59 PM:
------------------------------------------------------------------

Thinking about it a more, I think the optimization that's currently implemented works as long
as a) the limit is pushed to the streaming side of the join and b) the physical join implementation
guarantees that it will emit rows that have non-null RHSs from the streaming side before any
that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of (A,B). If we
do a full outer-join of these two inputs, the result should be some ordering of (A, A), (null,
B), (C, null). If we do a FOJ with LIMIT 1 pushed to the streaming side, imagine it returns
(B). It's an error if the join operator sees that A on the build-side has no match, and emits
that (A, null) before it sees that B has no match and emits (null, B). But if it emits (null,
B) first, the limit above it should kick in and no further rows will be emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, and it has
some implications for other transformations (e.g. it would not be safe to flip the join order
for a FOJ with a pushed-down limit - but would be ok for a non-pushed-down one). However,
it's also a bit concerning to remove an optimization that's probably a big win for some queries,
even if it's incorrect. There are rewrites that would work, e.g.:

{code}x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> x.sort.limit(10).join(y.sort.limit(10),
x('bar') === y('bar'), "outer").sort.limit(10) {code}

seems like it would be correct. But for now, how about we disable the push-down optimization
in {{LimitPushDown}} and see if there's a need to investigate more complicated optimizations
after that?


was (Author: henryr):
Thinking about it a more, I think the optimization that's currently implemented works as long
as a) the limit is pushed to the streaming side of the join and b) the physical join implementation
guarantees that it will emit rows that have non-null RHSs from the streaming side before any
that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of (A,B). If we
do a full outer-join of these two inputs, the result should be some ordering of (A, A), (null,
B), (C, null). If we do a FOJ with LIMIT 1 pushed to the streaming side, imagine it returns
(B). It's an error if the join operator sees that A on the build-side has no match, and emits
that (A, null) before it sees that B has no match and emits (null, B). But if it emits (null,
B) first, the limit above it should kick in and no further rows will be emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, and it has
some implications for other transformations (e.g. it would not be safe to flip the join order
for a FOJ with a pushed-down limit - but would be ok for a non-pushed-down one). However,
it's also a bit concerning to remove an optimization that's probably a big win for some queries,
even if it's incorrect. There are rewrites that would work, e.g.:

{{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> x.sort.limit(10).join(y.sort.limit(10),
x('bar') === y('bar'), "outer").sort.limit(10) }}

seems like it would be correct. But for now, how about we disable the push-down optimization
in {{LimitPushDown}} and see if there's a need to investigate more complicated optimizations
after that?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> --------------------------------------------------------------------
>
>                 Key: SPARK-22211
>                 URL: https://issues.apache.org/jira/browse/SPARK-22211
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>         Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>            Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate
a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected,
but at right side we have 100K rows including 999, the result will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/100000th chance to be selected by
CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 100000).toDF("id")
> val dr = shuffle(1 to 100000).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>    :- *Sort [id#10 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#10, 200)
>    :     +- *LocalLimit 1
>    :        +- LocalTableScan [id#10]
>    +- *Sort [id#16 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(id#16, 200)
>          +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> +----+---+
> |id  |id |
> +----+---+
> |null|148|
> +----+---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message