spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15910][SQL] Check schema consistency when using Kryo encoder to convert DataFrame to Dataset
Date Tue, 14 Jun 2016 00:44:00 GMT
Repository: spark
Updated Branches:
  refs/heads/master a6babca1b -> 7b9071eea


[SPARK-15910][SQL] Check schema consistency when using Kryo encoder to convert DataFrame to
Dataset

## What changes were proposed in this pull request?

This PR enforces schema check when converting DataFrame to Dataset using Kryo encoder. For
example.

**Before the change:**

Schema is NOT checked when converting DataFrame to Dataset using kryo encoder.
```
scala> case class B(b: Int)
scala> implicit val encoder = Encoders.kryo[B]
scala> val df = Seq((1)).toDF("b")
scala> val ds = df.as[B] // Schema compatibility is NOT checked
```

**After the change:**
Report AnalysisException since the schema is NOT compatible.
```
scala> val ds = Seq((1)).toDF("b").as[B]
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)' due to data type
mismatch: cannot cast IntegerType to BinaryType;
...
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13632 from clockfly/spark-15910.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b9071ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b9071ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b9071ee

Branch: refs/heads/master
Commit: 7b9071eeaa62fd9a51d9e94cfd479224b8341517
Parents: a6babca
Author: Sean Zhong <seanzhong@databricks.com>
Authored: Mon Jun 13 17:43:55 2016 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Jun 13 17:43:55 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/Encoders.scala      | 6 ++++--
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala  | 9 +++++++++
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b9071ee/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 673c587..e72f67c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast}
 import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer}
-import org.apache.spark.sql.catalyst.expressions.BoundReference
 import org.apache.spark.sql.types._
 
 /**
@@ -209,7 +209,9 @@ object Encoders {
           BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)),
       deserializer =
         DecodeUsingSerializer[T](
-          GetColumnByOrdinal(0, BinaryType), classTag[T], kryo = useKryo),
+          Cast(GetColumnByOrdinal(0, BinaryType), BinaryType),
+          classTag[T],
+          kryo = useKryo),
       clsTag = classTag[T]
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9071ee/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 96d85f1..f02a314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -453,6 +453,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
         (KryoData(2), KryoData(2))))
   }
 
+  test("Kryo encoder: check the schema mismatch when converting DataFrame to Dataset") {
+    implicit val kryoEncoder = Encoders.kryo[KryoData]
+    val df = Seq((1)).toDF("a")
+    val e = intercept[AnalysisException] {
+      df.as[KryoData]
+    }.message
+    assert(e.contains("cannot cast IntegerType to BinaryType"))
+  }
+
   test("Java encoder") {
     implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
     val ds = Seq(JavaData(1), JavaData(2)).toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message