spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC table instead of ORC file schema
Date Fri, 13 Oct 2017 15:12:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c9187db80 -> 30d5c9fd8


[SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC table instead of
ORC file schema

Before Hive 2.0, ORC File schema has invalid column names like `_col1` and `_col2`. This is
a well-known limitation and there are several Apache Spark issues with `spark.sql.hive.convertMetastoreOrc=true`.
This PR ignores ORC File schema and use Spark schema.

Pass the newly added test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19470 from dongjoon-hyun/SPARK-18355.

(cherry picked from commit e6e36004afc3f9fc8abea98542248e9de11b4435)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30d5c9fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30d5c9fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30d5c9fd

Branch: refs/heads/branch-2.2
Commit: 30d5c9fd8ae1944a94ddedae83433368a02e55e6
Parents: c9187db
Author: Dongjoon Hyun <dongjoon@apache.org>
Authored: Fri Oct 13 23:09:12 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Oct 13 23:11:50 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/orc/OrcFileFormat.scala      | 31 ++++++----
 .../sql/hive/execution/SQLQuerySuite.scala      | 62 +++++++++++++++++++-
 2 files changed, 80 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30d5c9fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 6b76cfa..54e8f82 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -134,12 +134,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with
Serializable
       // SPARK-8501: Empty ORC files always have an empty schema stored in their footer.
In this
       // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying
file
       // using the given physical schema. Instead, we simply return an empty iterator.
-      val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
-      if (maybePhysicalSchema.isEmpty) {
+      val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).isEmpty
+      if (isEmptyFile) {
         Iterator.empty
       } else {
-        val physicalSchema = maybePhysicalSchema.get
-        OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
+        OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
 
         val orcRecordReader = {
           val job = Job.getInstance(conf)
@@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         OrcRelation.unwrapOrcStructs(
           conf,
+          dataSchema,
           requiredSchema,
           Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
           recordsIterator)
@@ -272,25 +272,32 @@ private[orc] object OrcRelation extends HiveInspectors {
   def unwrapOrcStructs(
       conf: Configuration,
       dataSchema: StructType,
+      requiredSchema: StructType,
       maybeStructOI: Option[StructObjectInspector],
       iterator: Iterator[Writable]): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
-    val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
-    val unsafeProjection = UnsafeProjection.create(dataSchema)
+    val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
+    val unsafeProjection = UnsafeProjection.create(requiredSchema)
 
     def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
-      val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
-        case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
+      val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map {
+        case (field, ordinal) =>
+          var ref = oi.getStructFieldRef(field.name)
+          if (ref == null) {
+            ref = oi.getStructFieldRef("_col" + dataSchema.fieldIndex(field.name))
+          }
+          ref -> ordinal
       }.unzip
 
-      val unwrappers = fieldRefs.map(unwrapperFor)
+      val unwrappers = fieldRefs.map(r => if (r == null) null else unwrapperFor(r))
 
       iterator.map { value =>
         val raw = deserializer.deserialize(value)
         var i = 0
         val length = fieldRefs.length
         while (i < length) {
-          val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
+          val fieldRef = fieldRefs(i)
+          val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRef)
           if (fieldValue == null) {
             mutableRow.setNullAt(fieldOrdinals(i))
           } else {
@@ -306,8 +313,8 @@ private[orc] object OrcRelation extends HiveInspectors {
   }
 
   def setRequiredColumns(
-      conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit
= {
-    val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
+      conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = {
+    val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer)
     val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
     HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/30d5c9fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 31b36f1..3d1027a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -2034,4 +2034,64 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
       assert(setOfPath.size() == pathSizeToDeleteOnExit)
     }
   }
+
+  Seq("orc", "parquet").foreach { format =>
+    test(s"SPARK-18355 Read data from a hive table with a new column - $format") {
+      val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+      Seq("true", "false").foreach { value =>
+        withSQLConf(
+          HiveUtils.CONVERT_METASTORE_ORC.key -> value,
+          HiveUtils.CONVERT_METASTORE_PARQUET.key -> value) {
+          withTempDatabase { db =>
+            client.runSqlHive(
+              s"""
+                 |CREATE TABLE $db.t(
+                 |  click_id string,
+                 |  search_id string,
+                 |  uid bigint)
+                 |PARTITIONED BY (
+                 |  ts string,
+                 |  hour string)
+                 |STORED AS $format
+              """.stripMargin)
+
+            client.runSqlHive(
+              s"""
+                 |INSERT INTO TABLE $db.t
+                 |PARTITION (ts = '98765', hour = '01')
+                 |VALUES (12, 2, 12345)
+              """.stripMargin
+            )
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"),
+              Row("12", "2", 12345, "98765", "01"))
+
+            client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)")
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id FROM $db.t"),
+              Row("12", "2"))
+
+            checkAnswer(
+              sql(s"SELECT search_id, click_id FROM $db.t"),
+              Row("2", "12"))
+
+            checkAnswer(
+              sql(s"SELECT search_id FROM $db.t"),
+              Row("2"))
+
+            checkAnswer(
+              sql(s"SELECT dummy, click_id FROM $db.t"),
+              Row(null, "12"))
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"),
+              Row("12", "2", 12345, null, "98765", "01"))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message