spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-17123][SQL][BRANCH-2.0] Use type-widened encoder for DataFrame for set operations
Date Sun, 23 Oct 2016 12:00:42 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3d5878751 -> e21e9d416


[SPARK-17123][SQL][BRANCH-2.0] Use type-widened encoder for DataFrame for set operations

## What changes were proposed in this pull request?

This PR backports https://github.com/apache/spark/pull/15072

Please note that the test code is a bit different with the master as https://github.com/apache/spark/pull/14786
was only merged into master and therefore, it does not support type-widening between `DateType`
and `TimestampType`.

So, both types were taken out from the test.

## How was this patch tested?

Unit test in `DataFrameSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15601 from HyukjinKwon/backport-17123.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e21e9d41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e21e9d41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e21e9d41

Branch: refs/heads/branch-2.0
Commit: e21e9d4162cc798d9ec43ef984d17b89dab77826
Parents: 3d58787
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Sun Oct 23 14:00:35 2016 +0200
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Sun Oct 23 14:00:35 2016 +0200

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/Dataset.scala   | 16 +++++++++++++---
 .../scala/org/apache/spark/sql/DataFrameSuite.scala | 16 ++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e21e9d41/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0b236a0..4946bbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1456,7 +1456,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 2.0.0
    */
-  def union(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def union(other: Dataset[T]): Dataset[T] = withSetOperator {
     // This breaks caching, but it's usually ok because it addresses a very specific use
case:
     // using union to union many files or partitions.
     CombineUnions(Union(logicalPlan, other.logicalPlan))
@@ -1472,7 +1472,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 1.6.0
    */
-  def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
     Intersect(logicalPlan, other.logicalPlan)
   }
 
@@ -1486,7 +1486,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 2.0.0
    */
-  def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def except(other: Dataset[T]): Dataset[T] = withSetOperator {
     Except(logicalPlan, other.logicalPlan)
   }
 
@@ -2607,4 +2607,14 @@ class Dataset[T] private[sql](
   @inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U]
= {
     Dataset(sparkSession, logicalPlan)
   }
+
+  /** A convenient function to wrap a set based logical plan and produce a Dataset. */
+  @inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U]
= {
+    if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
+      // Set operators widen types (change the schema), so we cannot reuse the row encoder.
+      Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
+    } else {
+      Dataset(sparkSession, logicalPlan)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e21e9d41/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 7ab0fe0..f8d7ddd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.io.File
 import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
 import java.util.UUID
 
 import scala.language.postfixOps
@@ -1585,4 +1586,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-17123: Performing set operations that combine non-scala native types") {
+    val dates = Seq(
+      (BigDecimal.valueOf(1), new Timestamp(2)),
+      (BigDecimal.valueOf(4), new Timestamp(5))
+    ).toDF("decimal", "timestamp")
+
+    val widenTypedRows = Seq(
+      (10.5D, "string")
+    ).toDF("decimal", "timestamp")
+
+    dates.union(widenTypedRows).collect()
+    dates.except(widenTypedRows).collect()
+    dates.intersect(widenTypedRows).collect()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message