spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-17693][SQL] Fixed Insert Failure To Data Source Tables when the Schema has the Comment Field
Date Wed, 26 Oct 2016 07:38:41 GMT
Repository: spark
Updated Branches:
  refs/heads/master 12b3e8d2e -> 93b8ad184


[SPARK-17693][SQL] Fixed Insert Failure To Data Source Tables when the Schema has the Comment
Field

### What changes were proposed in this pull request?
```SQL
CREATE TABLE tab1(col1 int COMMENT 'a', col2 int) USING parquet
INSERT INTO TABLE tab1 SELECT 1, 2
```
The insert attempt will fail if the target table has a column with comments. The error is
strange to the external users:
```
assertion failed: No plan for InsertIntoTable Relation[col1#15,col2#16] parquet, false, false
+- Project [1 AS col1#19, 2 AS col2#20]
   +- OneRowRelation$
```

This PR is to fix the above bug by checking the metadata when comparing the schema between
the table and the query. If not matched, we also copy the metadata. This is an alternative
to https://github.com/apache/spark/pull/15266

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15615 from gatorsmile/insertDataSourceTableWithCommentSolution2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93b8ad18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93b8ad18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93b8ad18

Branch: refs/heads/master
Commit: 93b8ad184aa3634f340d43a8bdf99836ef3d4f6c
Parents: 12b3e8d
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Wed Oct 26 00:38:34 2016 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Wed Oct 26 00:38:34 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 10 ++++-
 .../apache/spark/sql/sources/InsertSuite.scala  | 42 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93b8ad18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index cf501cd..4647b11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -248,10 +248,16 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan]
{
       expectedOutput: Seq[Attribute]): InsertIntoTable = {
     val newChildOutput = expectedOutput.zip(insert.child.output).map {
       case (expected, actual) =>
-        if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name)
{
+        if (expected.dataType.sameType(actual.dataType) &&
+            expected.name == actual.name &&
+            expected.metadata == actual.metadata) {
           actual
         } else {
-          Alias(Cast(actual, expected.dataType), expected.name)()
+          // Renaming is needed for handling the following cases like
+          // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
+          // 2) Target tables have column metadata
+          Alias(Cast(actual, expected.dataType), expected.name)(
+            explicitMetadata = Option(expected.metadata))
         }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/93b8ad18/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 5eb5464..4a85b59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -185,6 +185,48 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
     )
   }
 
+  test("INSERT INTO TABLE with Comment in columns") {
+    val tabName = "tab1"
+    withTable(tabName) {
+      sql(
+        s"""
+           |CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tabName SELECT 1, 2")
+
+      checkAnswer(
+        sql(s"SELECT col1, col2 FROM $tabName"),
+        Row(1, 2) :: Nil
+      )
+    }
+  }
+
+  test("INSERT INTO TABLE - complex type but different names") {
+    val tab1 = "tab1"
+    val tab2 = "tab2"
+    withTable(tab1, tab2) {
+      sql(
+        s"""
+           |CREATE TABLE $tab1 (s struct<a: string, b: string>)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tab1 SELECT named_struct('col1','1','col2','2')")
+
+      sql(
+        s"""
+           |CREATE TABLE $tab2 (p struct<c: string, d: string>)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1")
+
+      checkAnswer(
+        spark.table(tab1),
+        spark.table(tab2)
+      )
+    }
+  }
+
   test("it is not allowed to write to a table while querying it.") {
     val message = intercept[AnalysisException] {
       sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message