spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can lead to NPE
Date Wed, 19 Apr 2017 07:54:55 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a4c1ebc1d -> 171bf656f


[SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can
lead to NPE

Avoid necessary execution that can lead to NPE in EliminateOuterJoin and add test in DataFrameSuite
to confirm NPE is no longer thrown

## What changes were proposed in this pull request?
Change leftHasNonNullPredicate and rightHasNonNullPredicate to lazy so they are only executed
when needed.

## How was this patch tested?

Added test in DataFrameSuite that failed before this fix and now succeeds. Note that a test
in catalyst project would be better but i am unsure how to do this.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Koert Kuipers <koert@tresata.com>

Closes #17660 from koertkuipers/feat-catch-npe-in-eliminate-outer-join.

(cherry picked from commit 608bf30f0b9759fd0b9b9f33766295550996a9eb)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/171bf656
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/171bf656
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/171bf656

Branch: refs/heads/branch-2.1
Commit: 171bf656f8a940ee334c13e162233381de38c8bd
Parents: a4c1ebc
Author: Koert Kuipers <koert@tresata.com>
Authored: Wed Apr 19 15:52:47 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Apr 19 15:54:48 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/optimizer/joins.scala   |  4 ++--
 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala  | 10 ++++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/171bf656/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index bfe529e..e314955 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -121,8 +121,8 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper
{
     val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
     val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
 
-    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
-    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
+    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
 
     join.joinType match {
       case RightOuter if leftHasNonNullPredicate => Inner

http://git-wip-us.apache.org/repos/asf/spark/blob/171bf656/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ec201f3..149db98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1755,4 +1755,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         "Cannot have map type columns in DataFrame which calls set operations"))
     }
   }
+
+  test("SPARK-20359: catalyst outer join optimization should not throw npe") {
+    val df1 = Seq("a", "b", "c").toDF("x")
+      .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" }.apply($"x"))
+    val df2 = Seq("a", "b").toDF("x1")
+    df1
+      .join(df2, df1("x") === df2("x1"), "left_outer")
+      .filter($"x1".isNotNull || !$"y".isin("a!"))
+      .count
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message