spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samee...@apache.org
Subject spark git commit: [SPARK-21769][SQL] Add a table-specific option for always respecting schemas inferred/controlled by Spark SQL
Date Tue, 22 Aug 2017 20:13:17 GMT
Repository: spark
Updated Branches:
  refs/heads/master 43d71d965 -> 01a8e4627


[SPARK-21769][SQL] Add a table-specific option for always respecting schemas inferred/controlled
by Spark SQL

## What changes were proposed in this pull request?
For Hive-serde tables, we always respect the schema stored in Hive metastore, because the
schema could be altered by the other engines that share the same metastore. Thus, we always
trust the metastore-controlled schema for Hive-serde tables when the schemas are different
(without considering the nullability and cases). However, in some scenarios, Hive metastore
also could INCORRECTLY overwrite the schemas when the serde and Hive metastore built-in serde
are different.

The proposed solution is to introduce a table-specific option for such scenarios. For a specific
table, users can make Spark always respect Spark-inferred/controlled schema instead of trusting
metastore-controlled schema. By default, we trust Hive metastore-controlled schema.

## How was this patch tested?
Added a cross-version test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19003 from gatorsmile/respectSparkSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01a8e462
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01a8e462
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01a8e462

Branch: refs/heads/master
Commit: 01a8e46278dbfde916a74b6fd51e08804602e1cf
Parents: 43d71d9
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Tue Aug 22 13:12:59 2017 -0700
Committer: Sameer Agarwal <sameerag@apache.org>
Committed: Tue Aug 22 13:12:59 2017 -0700

----------------------------------------------------------------------
 .../execution/datasources/SourceOptions.scala   |  50 +++++++++++++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 ++--
 .../src/test/resources/avroDecimal/decimal.avro | Bin 0 -> 203 bytes
 .../spark/sql/hive/client/VersionsSuite.scala   |  41 +++++++++++++++
 4 files changed, 97 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
new file mode 100644
index 0000000..c98c0b2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SourceOptions.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for the data source.
+ */
+class SourceOptions(
+     @transient private val parameters: CaseInsensitiveMap[String])
+  extends Serializable {
+  import SourceOptions._
+
+  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+
+  // A flag to disable saving a data source table's metadata in hive compatible way.
+  val skipHiveMetadata: Boolean = parameters
+    .get(SKIP_HIVE_METADATA).map(_.toBoolean).getOrElse(DEFAULT_SKIP_HIVE_METADATA)
+
+  // A flag to always respect the Spark schema restored from the table properties
+  val respectSparkSchema: Boolean = parameters
+    .get(RESPECT_SPARK_SCHEMA).map(_.toBoolean).getOrElse(DEFAULT_RESPECT_SPARK_SCHEMA)
+}
+
+
+object SourceOptions {
+
+  val SKIP_HIVE_METADATA = "skipHiveMetadata"
+  val DEFAULT_SKIP_HIVE_METADATA = false
+
+  val RESPECT_SPARK_SCHEMA = "respectSparkSchema"
+  val DEFAULT_RESPECT_SPARK_SCHEMA = false
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index bdbb8bc..34af37c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -260,6 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
   private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit =
{
     // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
     val provider = table.provider.get
+    val options = new SourceOptions(table.storage.properties)
 
     // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
     // support, no column nullability, etc., we should do some extra works before saving
table
@@ -325,11 +326,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
 
     val qualifiedTableName = table.identifier.quotedString
     val maybeSerde = HiveSerDe.sourceToSerDe(provider)
-    val skipHiveMetadata = table.storage.properties
-      .getOrElse("skipHiveMetadata", "false").toBoolean
 
     val (hiveCompatibleTable, logMessage) = maybeSerde match {
-      case _ if skipHiveMetadata =>
+      case _ if options.skipHiveMetadata =>
         val message =
           s"Persisting data source table $qualifiedTableName into Hive metastore in" +
             "Spark SQL specific format, which is NOT compatible with Hive."
@@ -737,6 +736,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
   }
 
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
+    val options = new SourceOptions(table.storage.properties)
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
       tracksPartitionsInCatalog = true)
@@ -748,7 +748,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf:
Configurat
       val partColumnNames = getPartitionColumnsFromTableProperties(table)
       val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
 
-      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) ||
+          options.respectSparkSchema) {
         hiveTable.copy(
           schema = reorderedSchema,
           partitionColumnNames = partColumnNames,

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/resources/avroDecimal/decimal.avro
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/avroDecimal/decimal.avro b/sql/hive/src/test/resources/avroDecimal/decimal.avro
new file mode 100755
index 0000000..6da423f
Binary files /dev/null and b/sql/hive/src/test/resources/avroDecimal/decimal.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01a8e462/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 072e538..cbbe869 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -763,6 +763,47 @@ class VersionsSuite extends SparkFunSuite with Logging {
       }
     }
 
+    test(s"$version: read avro file containing decimal") {
+      val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
+      val location = new File(url.getFile)
+
+      val tableName = "tab1"
+      val avroSchema =
+        """{
+          |  "name": "test_record",
+          |  "type": "record",
+          |  "fields": [ {
+          |    "name": "f0",
+          |    "type": [
+          |      "null",
+          |      {
+          |        "precision": 38,
+          |        "scale": 2,
+          |        "type": "bytes",
+          |        "logicalType": "decimal"
+          |      }
+          |    ]
+          |  } ]
+          |}
+        """.stripMargin
+      withTable(tableName) {
+        versionSpark.sql(
+          s"""
+             |CREATE TABLE $tableName
+             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+             |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+             |STORED AS
+             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+             |LOCATION '$location'
+             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+        )
+        assert(versionSpark.table(tableName).collect() ===
+          versionSpark.sql("SELECT 1.30").collect())
+      }
+    }
+
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message