spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yamam...@apache.org
Subject [spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property
Date Sat, 29 Aug 2020 21:55:12 GMT
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable
property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh <viirya@gmail.com>
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

    [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property
    
    ### What changes were proposed in this pull request?
    
    This PR changes key data types check in `HashJoin` to use `sameType`. This backports #29555
to branch-2.4.
    
    ### Why are the changes needed?
    
    Looks at the resolving condition of `SetOperation`, it requires only each left data types
should be `sameType` as the right ones. Logically the `EqualTo` expression in equi-join, also
requires only left data type `sameType` as right data type. Then `HashJoin` requires left
keys data type exactly the same as right keys data type, looks not reasonable.
    
    It makes inconsistent results when doing `except` between two dataframes.
    
    If two dataframes don't have nested fields, even their field nullable property different,
`HashJoin` passes the key type check because it checks field individually so field nullable
property is ignored.
    
    If two dataframes have nested fields like struct, `HashJoin` fails the key type check
because now it compare two struct types and nullable property now affects.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Making consistent `except` operation between dataframes.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #29576 from viirya/SPARK-32693-2.4.
    
    Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
    Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
---
 .../spark/sql/execution/joins/HashJoin.scala       |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-    require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-      "Join keys from two sides should have same types")
+    require(leftKeys.length == rightKeys.length &&
+      leftKeys.map(_.dataType)
+        .zip(rightKeys.map(_.dataType))
+        .forall(types => types._1.sameType(types._2)),
+      "Join keys from two sides should have same length and types")
     val lkeys = HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, left.output))
     val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
       .map(BindReferences.bindReference(_, right.output))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
     }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable property") {
+    val schema1 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", IntegerType, false) ::
+        StructField("c", IntegerType, false) :: Nil)
+    val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+    val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+    val schema2 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("c", IntegerType) :: Nil)
+    val rowSeq2: List[Row] = List(Row(10, 1, 1))
+    val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+    checkAnswer(df1.except(df2), Row(10, 50, 2))
+
+    val schema3 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", IntegerType, false) ::
+        StructField("c", IntegerType, false) ::
+        StructField("d", schema1, false) :: Nil)
+    val rowSeq3: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)), Row(10, 50, 2, Row(10, 50,
2)))
+    val df3 = spark.createDataFrame(rowSeq3.asJava, schema3)
+
+    val schema4 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("d", schema2) :: Nil)
+    val rowSeq4: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)))
+    val df4 = spark.createDataFrame(rowSeq4.asJava, schema4)
+
+    checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message