spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject [spark] branch branch-3.0 updated: [SPARK-33844][SQL][3.0] InsertIntoHiveDir command should check col name too
Date Tue, 05 Jan 2021 19:45:40 GMT
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9ba6db9  [SPARK-33844][SQL][3.0] InsertIntoHiveDir command should check col name
too
9ba6db9 is described below

commit 9ba6db94a2fef23053f208b0b740b8b0a1cc7d88
Author: angerszhu <angers.zhu@gmail.com>
AuthorDate: Tue Jan 5 19:45:09 2021 +0000

    [SPARK-33844][SQL][3.0] InsertIntoHiveDir command should check col name too
    
    ### What changes were proposed in this pull request?
    
    In hive-1.2.1, hive serde just split `serdeConstants.LIST_COLUMNS` and `serdeConstants.LIST_COLUMN_TYPES`
use comma.
    
    When we use spark 2.4 with UT
    ```
      test("insert overwrite directory with comma col name") {
        withTempDir { dir =>
          val path = dir.toURI.getPath
    
          val v1 =
            s"""
               | INSERT OVERWRITE DIRECTORY '${path}'
               | STORED AS TEXTFILE
               | SELECT 1 as a, 'c' as b, if(1 = 1, "true", "false")
             """.stripMargin
    
          sql(v1).explain(true)
    
          sql(v1).show()
        }
      }
    ```
    failed with as below since column name contains `,` then column names and column types
size not equal.
    ```
    19:56:05.618 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter:  [ angerszhu
] Aborting job dd774f18-93fa-431f-9468-3534c7d8acda.
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0
failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
driver): org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe:
columns has 5 elements while columns.types has 3 elements!
    	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
    	at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.<init>(LazySerDeParameters.java:85)
    	at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
    	at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:119)
    	at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
    	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:287)
    	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:219)
    	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:218)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:121)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:461)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:467)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    
    ```
    
    After hive-2.3 we will set COLUMN_NAME_DELIMITER to special char when col name cntains
',':
    https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1180-L1188
    https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1044-L1075
    
    And in script transform, we parse column name  to avoid this problem
    https://github.com/apache/spark/blob/554600c2af0dbc8979955807658fafef5dc66c08/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala#L257-L261
    
    So I think in `InsertIntoHiveDirComman`, we should do same thing too. And I have verified
this method can make spark-2.4 work well.
    
    ### Why are the changes needed?
    More save use serde
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    Closes #31038 from AngersZhuuuu/SPARK-33844-3.0.
    
    Authored-by: angerszhu <angers.zhu@gmail.com>
    Signed-off-by: Wenchen Fan <wenchen@databricks.com>
---
 .../sql/hive/execution/InsertIntoHiveDirCommand.scala      |  9 +++++++--
 .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 14 ++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index b66c302..7ef637e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -63,12 +64,16 @@ case class InsertIntoHiveDirCommand(
       s"when inserting into ${storage.locationUri.get}",
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
-    val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
+    val table = CatalogTable(
       identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
+      provider = Some(DDLUtils.HIVE_PROVIDER),
       tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
       storage = storage,
       schema = outputColumns.toStructType
-    ))
+    )
+    DDLUtils.checkDataColNames(table)
+
+    val hiveTable = HiveClientImpl.toHiveTable(table)
     hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
       storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 765119d..c8726c7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2766,4 +2766,18 @@ class HiveDDLSuite
       checkAnswer(sql("SHOW PARTITIONS t"), Seq.empty)
     }
   }
+
+  test("SPARK-33844: Insert overwrite directory should check schema too") {
+    withView("v") {
+      spark.range(1).createTempView("v")
+      withTempPath { path =>
+        val e = intercept[AnalysisException] {
+          spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
+            s"STORED AS PARQUET SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v")
+        }.getMessage
+        assert(e.contains("Attribute name \"(IF((1 = 1), 1, 0))\" contains" +
+          " invalid character(s) among \" ,;{}()\\n\\t=\". Please use alias to rename it."))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message