spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Albert Meltzer (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-20312) query optimizer calls udf with null values when it doesn't expect them
Date Wed, 12 Apr 2017 15:50:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966091#comment-15966091
] 

Albert Meltzer commented on SPARK-20312:
----------------------------------------

Query plans are as follows:

{noformat}
== Parsed Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `database`.`table`, OverwriteOptions(true,Map()), false
+- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && isnotnull(version#59))
&& isnotnull(col5#45))
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45]
      +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS version#59]
         +- Repartition 8, true
            +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45]
               +- Project [col4#0L, col3#5L, col2#10L, col1#11L]
                  +- Project [col2#10L, col1#11L, col4#0L, col3#5L]
                     +- Join LeftOuter, (col2#10L = col2#6L)
                        :- Sort [col2#10L ASC NULLS FIRST], false
                        :  +- RepartitionByExpression [col2#10L]
                        :     +- Filter isnotnull(col1#11L)
                        :        +- Filter isnotnull(col2#10L)
                        :           +- LocalRelation [col2#10L, col1#11L]
                        +- Project [col4#0L, col3#5L, col2#6L]
                           +- Project [col3#5L, col2#6L, col4#0L]
                              +- Join LeftOuter, (col3#5L = col3#1L)
                                 :- Sort [col3#5L ASC NULLS FIRST], false
                                 :  +- RepartitionByExpression [col3#5L]
                                 :     +- Filter isnotnull(col2#6L)
                                 :        +- Filter isnotnull(col3#5L)
                                 :           +- LocalRelation [col3#5L, col2#6L]
                                 +- Sort [col3#1L ASC NULLS FIRST], false
                                    +- RepartitionByExpression [col3#1L]
                                       +- Filter isnotnull(col3#1L)
                                          +- Filter isnotnull(col4#0L)
                                             +- LocalRelation [col4#0L, col3#1L]

== Analyzed Logical Plan ==
InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> None),
OverwriteOptions(true,Map()), false
+- Filter (((isnotnull(col2#10L) && isnotnull(col1#11L)) && isnotnull(version#59))
&& isnotnull(col5#45))
   +- Project [col4#0L, col3#5L, col2#10L, col1#11L, version#59, col5#45]
      +- Project [col4#0L, col3#5L, col2#10L, col1#11L, col5#45, 2016-11-11 AS version#59]
         +- Repartition 8, true
            +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45]
               +- Project [col4#0L, col3#5L, col2#10L, col1#11L]
                  +- Project [col2#10L, col1#11L, col4#0L, col3#5L]
                     +- Join LeftOuter, (col2#10L = col2#6L)
                        :- Sort [col2#10L ASC NULLS FIRST], false
                        :  +- RepartitionByExpression [col2#10L]
                        :     +- Filter isnotnull(col1#11L)
                        :        +- Filter isnotnull(col2#10L)
                        :           +- LocalRelation [col2#10L, col1#11L]
                        +- Project [col4#0L, col3#5L, col2#6L]
                           +- Project [col3#5L, col2#6L, col4#0L]
                              +- Join LeftOuter, (col3#5L = col3#1L)
                                 :- Sort [col3#5L ASC NULLS FIRST], false
                                 :  +- RepartitionByExpression [col3#5L]
                                 :     +- Filter isnotnull(col2#6L)
                                 :        +- Filter isnotnull(col3#5L)
                                 :           +- LocalRelation [col3#5L, col2#6L]
                                 +- Sort [col3#1L ASC NULLS FIRST], false
                                    +- RepartitionByExpression [col3#1L]
                                       +- Filter isnotnull(col3#1L)
                                          +- Filter isnotnull(col4#0L)
                                             +- LocalRelation [col4#0L, col3#1L]

== Optimized Logical Plan ==
InsertIntoTable MetastoreRelation database, table, Map(version -> None, col5 -> None),
OverwriteOptions(true,Map()), false
+- Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, col5#45]
   +- Repartition 8, true
      +- Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45]
         +- Join LeftOuter, (col2#10L = col2#6L)
            :- Sort [col2#10L ASC NULLS FIRST], false
            :  +- RepartitionByExpression [col2#10L]
            :     +- Filter ((isnotnull(col2#10L) && isnotnull(col1#11L)) &&
isnotnull(UDF(col1#11L)))
            :        +- LocalRelation [col2#10L, col1#11L]
            +- Project [col4#0L, col3#5L, col2#6L]
               +- Join LeftOuter, (col3#5L = col3#1L)
                  :- Sort [col3#5L ASC NULLS FIRST], false
                  :  +- RepartitionByExpression [col3#5L]
                  :     +- Filter (isnotnull(col3#5L) && isnotnull(col2#6L))
                  :        +- LocalRelation [col3#5L, col2#6L]
                  +- Sort [col3#1L ASC NULLS FIRST], false
                     +- RepartitionByExpression [col3#1L]
                        +- Filter (isnotnull(col4#0L) && isnotnull(col3#1L))
                           +- LocalRelation [col4#0L, col3#1L]

== Physical Plan ==
InsertIntoHiveTable MetastoreRelation database, table, Map(version -> None, col5 ->
None), true, false
+- *Project [col4#0L, col3#5L, col2#10L, col1#11L, 2016-11-11 AS version#59, col5#45]
   +- Exchange RoundRobinPartitioning(8)
      +- *Project [col4#0L, col3#5L, col2#10L, col1#11L, UDF(col1#11L) AS col5#45]
         +- *BroadcastHashJoin [col2#10L], [col2#6L], LeftOuter, BuildRight
            :- *Sort [col2#10L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(col2#10L, 200)
            :     +- *Filter ((isnotnull(col2#10L) && isnotnull(col1#11L)) &&
isnotnull(UDF(col1#11L)))
            :        +- LocalTableScan [col2#10L, col1#11L]
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, bigint, true]))
               +- *Project [col4#0L, col3#5L, col2#6L]
                  +- *BroadcastHashJoin [col3#5L], [col3#1L], LeftOuter, BuildRight
                     :- *Sort [col3#5L ASC NULLS FIRST], false, 0
                     :  +- Exchange hashpartitioning(col3#5L, 200)
                     :     +- *Filter (isnotnull(col3#5L) && isnotnull(col2#6L))
                     :        +- LocalTableScan [col3#5L, col2#6L]
                     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint,
false]))
                        +- *Sort [col3#1L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(col3#1L, 200)
                              +- *Filter (isnotnull(col4#0L) && isnotnull(col3#1L))
                                 +- LocalTableScan [col4#0L, col3#1L]
{noformat}


> query optimizer calls udf with null values when it doesn't expect them
> ----------------------------------------------------------------------
>
>                 Key: SPARK-20312
>                 URL: https://issues.apache.org/jira/browse/SPARK-20312
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Albert Meltzer
>
> When optimizing an outer join, spark passes an empty row to both sides to see if nulls
would be ignored (side comment: for half-outer joins it subsequently ignores the assessment
on the dominant side).
> For some reason, a condition such as "x IS NOT NULL && udf(x) IS NOT NULL" might
result in checking the right side first, and an exception if the udf doesn't expect a null
input (given the left side first).
> A example is SIMILAR to the following (see actual query plans separately):
> def func(value: Any): Int = ... // return AnyVal which probably causes a IS NOT NULL
added filter on the result
> val df1 = sparkSession
>   .table(...)
>   .select("col1", "col2") // LongType both
> val df11 = df1
>   .filter(df1("col1").isNotNull)
>   .withColumn("col3", functions.udf(func)(df1("col1"))
>   .repartition(df1("col2"))
>   .sortWithinPartitions(df1("col2"))
> val df2 = ... // load other data containing col2, similarly repartition and sort
> val df3 =
>   df1.join(df2, Seq("col2"), "left_outer")



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message