spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a data source table in a hive compatible way
Date Tue, 22 Sep 2015 20:29:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5017c685f -> 2204cdb28


[SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a data source table in
a hive compatible way

https://issues.apache.org/jira/browse/SPARK-10672

With changes in this PR, we will fallback to same the metadata of a table in Spark SQL specific
way if we fail to save it in a hive compatible way (Hive throws an exception because of its
internal restrictions, e.g. binary and decimal types cannot be saved to parquet if the metastore
is running Hive 0.13). I manually tested the fix with the following test in `DataSourceWithHiveMetastoreCatalogSuite`
(`spark.sql.hive.metastore.version=0.13` and `spark.sql.hive.metastore.jars`=`maven`).

```
    test(s"fail to save metadata of a parquet table in hive 0.13") {
      withTempPath { dir =>
        withTable("t") {
          val path = dir.getCanonicalPath

          sql(
            s"""CREATE TABLE t USING $provider
               |OPTIONS (path '$path')
               |AS SELECT 1 AS d1, cast("val_1" as binary) AS d2
             """.stripMargin)

          sql(
            s"""describe formatted t
             """.stripMargin).collect.foreach(println)

          sqlContext.table("t").show
        }
      }
    }
  }
```

Without this fix, we will fail with the following error.
```
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.UnsupportedOperationException:
Unknown field type: binary
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:619)
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply$mcV$sp(ClientWrapper.scala:359)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:256)
	at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:211)
	at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:248)
	at org.apache.spark.sql.hive.client.ClientWrapper.createTable(ClientWrapper.scala:357)
	at org.apache.spark.sql.hive.HiveMetastoreCatalog.createDataSourceTable(HiveMetastoreCatalog.scala:358)
	at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:285)
	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
	at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:58)
	at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
	at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:129)
	at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
	at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:165)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:150)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:52)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:162)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:161)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:125)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTempPath(HiveMetastoreCatalogSuite.scala:52)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:161)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
	at org.scalatest.Suite$class.run(Suite.scala:1424)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.org$scalatest$BeforeAndAfterAll$$super$run(HiveMetastoreCatalogSuite.scala:52)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
	at org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.run(HiveMetastoreCatalogSuite.scala:52)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
	at sbt.ForkMain$Run$2.call(ForkMain.java:294)
	at sbt.ForkMain$Run$2.call(ForkMain.java:284)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Unknown field type: binary
	at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:108)
	at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.<init>(ArrayWritableObjectInspector.java:60)
	at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113)
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:339)
	at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288)
	at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:194)
	at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:597)
	... 76 more
```

Author: Yin Huai <yhuai@databricks.com>

Closes #8824 from yhuai/datasourceMetadata.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2204cdb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2204cdb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2204cdb2

Branch: refs/heads/master
Commit: 2204cdb28483b249616068085d4e88554fe6acef
Parents: 5017c68
Author: Yin Huai <yhuai@databricks.com>
Authored: Tue Sep 22 13:29:39 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Tue Sep 22 13:29:39 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 101 +++++++++----------
 1 file changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2204cdb2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0c1b41e..012634c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface,
hive: Hive
     }
 
     // TODO: Support persisting partitioned data source relations in Hive compatible format
-    val hiveTable = (maybeSerDe, dataSource.relation) match {
+    val qualifiedTableName = tableIdent.quotedString
+    val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
       case (Some(serde), relation: HadoopFsRelation)
-          if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
-        // Hive ParquetSerDe doesn't support decimal type until 1.2.0.
-        val isParquetSerDe = serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
-        val hasDecimalFields = relation.schema.existsRecursively(_.isInstanceOf[DecimalType])
-
-        val hiveParquetSupportsDecimal = client.version match {
-          case org.apache.spark.sql.hive.client.hive.v1_2 => true
-          case _ => false
-        }
-
-        if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields)
{
-          // If Hive version is below 1.2.0, we cannot save Hive compatible schema to
-          // metastore when the file format is Parquet and the schema has DecimalType.
-          logWarning {
-            "Persisting Parquet relation with decimal field(s) into Hive metastore in Spark
SQL " +
-              "specific format, which is NOT compatible with Hive. Because ParquetHiveSerDe
in " +
-              s"Hive ${client.version.fullVersion} doesn't support decimal type. See HIVE-6384."
-          }
-          newSparkSQLSpecificMetastoreTable()
-        } else {
-          logInfo {
-            "Persisting data source relation with a single input path into Hive metastore
in " +
-              s"Hive compatible format. Input path: ${relation.paths.head}"
-          }
-          newHiveCompatibleMetastoreTable(relation, serde)
-        }
+        if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+        val message =
+          s"Persisting data source relation $qualifiedTableName with a single input path
" +
+            s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
+        (Some(hiveTable), message)
 
       case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty
=>
-        logWarning {
-          "Persisting partitioned data source relation into Hive metastore in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive.  Input path(s):
" +
-            relation.paths.mkString("\n", "\n", "")
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Persisting partitioned data source relation $qualifiedTableName into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
" +
+            "Input path(s): " + relation.paths.mkString("\n", "\n", "")
+        (None, message)
 
       case (Some(serde), relation: HadoopFsRelation) =>
-        logWarning {
-          "Persisting data source relation with multiple input paths into Hive metastore
in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive.  Input paths:
" +
-            relation.paths.mkString("\n", "\n", "")
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Persisting data source relation $qualifiedTableName with multiple input paths
into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
" +
+            s"Input paths: " + relation.paths.mkString("\n", "\n", "")
+        (None, message)
 
       case (Some(serde), _) =>
-        logWarning {
-          s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
-            "Persisting it into Hive metastore in Spark SQL specific format, " +
-            "which is NOT compatible with Hive."
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Data source relation $qualifiedTableName is not a " +
+            s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore
" +
+            "in Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
 
       case _ =>
-        logWarning {
+        val message =
           s"Couldn't find corresponding Hive SerDe for data source provider $provider. "
+
-            "Persisting data source relation into Hive metastore in Spark SQL specific format,
" +
-            "which is NOT compatible with Hive."
-        }
-        newSparkSQLSpecificMetastoreTable()
+            s"Persisting data source relation $qualifiedTableName into Hive metastore in
" +
+            s"Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
     }
 
-    client.createTable(hiveTable)
+    (hiveCompitiableTable, logMessage) match {
+      case (Some(table), message) =>
+        // We first try to save the metadata of the table in a Hive compatiable way.
+        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+        // specific way.
+        try {
+          logInfo(message)
+          client.createTable(table)
+        } catch {
+          case throwable: Throwable =>
+            val warningMessage =
+              s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting
" +
+                s"it into Hive metastore in Spark SQL specific format."
+            logWarning(warningMessage, throwable)
+            val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
+            client.createTable(sparkSqlSpecificTable)
+        }
+
+      case (None, message) =>
+        logWarning(message)
+        val hiveTable = newSparkSQLSpecificMetastoreTable()
+        client.createTable(hiveTable)
+    }
   }
 
   def hiveDefaultTableFilePath(tableName: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message