spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits
Date Wed, 25 Feb 2015 23:15:25 GMT
Repository: spark
Updated Branches:
  refs/heads/master f3f4c87b3 -> e0fdd467e


[SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits

`ReadContext.init` calls `InitContext.getMergedKeyValueMetadata`, which doesn't know how to
merge conflicting user defined key-value metadata and throws exception. In our case, when
dealing with different but compatible schemas, we have different Spark SQL schema JSON strings
in different Parquet part-files, thus causes this problem. Reading similar Parquet files generated
by Hive doesn't suffer from this issue.

In this PR, we manually merge the schemas before passing it to `ReadContext` to avoid the
exception.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4768)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4768 from liancheng/spark-6010 and squashes the following commits:

9002f0a [Cheng Lian] Fixes SPARK-6010


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0fdd467
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0fdd467
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0fdd467

Branch: refs/heads/master
Commit: e0fdd467e277867d6bec5c6605cc1cabce70ac89
Parents: f3f4c87
Author: Cheng Lian <lian@databricks.com>
Authored: Wed Feb 25 15:15:22 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Feb 25 15:15:22 2015 -0800

----------------------------------------------------------------------
 .../sql/parquet/ParquetTableOperations.scala    | 20 ++++++++++++++++++-
 .../apache/spark/sql/parquet/ParquetTest.scala  |  5 +++++
 .../ParquetPartitionDiscoverySuite.scala        | 21 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 6596645..4dc13b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -48,6 +48,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
@@ -459,13 +460,30 @@ private[parquet] class FilteringParquetRowInputFormat
     val getGlobalMetaData =
       classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
     getGlobalMetaData.setAccessible(true)
-    val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
+    var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
 
     if (globalMetaData == null) {
      val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
      return splits
     }
 
+    Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach
{
+      schemas =>
+        val mergedSchema = schemas
+          .map(DataType.fromJson(_).asInstanceOf[StructType])
+          .reduce(_ merge _)
+          .json
+
+        val mergedMetadata = globalMetaData
+          .getKeyValueMetaData
+          .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema)))
+
+        globalMetaData = new GlobalMetaData(
+          globalMetaData.getSchema,
+          mergedMetadata,
+          globalMetaData.getCreatedBy)
+    }
+
     val readContext = getReadSupport(configuration).init(
       new InitContext(configuration,
         globalMetaData.getKeyValueMetaData,

http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 0fa2fe9..d6ea667 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -131,6 +131,11 @@ private[sql] trait ParquetTest {
     data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
   }
 
+  protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
+      df: DataFrame, path: File): Unit = {
+    df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+  }
+
   protected def makePartitionDir(
       basePath: File,
       defaultPartitionName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/e0fdd467/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 3bf0116..adb3c93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -36,6 +36,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
   override val sqlContext: SQLContext = TestSQLContext
 
   import sqlContext._
+  import sqlContext.implicits._
 
   val defaultPartitionName = "__NULL__"
 
@@ -319,4 +320,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
       }
     }
   }
+
+  test("read partitioned table - merging compatible schemas") {
+    withTempDir { base =>
+      makeParquetFile(
+        (1 to 10).map(i => Tuple1(i)).toDF("intField"),
+        makePartitionDir(base, defaultPartitionName, "pi" -> 1))
+
+      makeParquetFile(
+        (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
+        makePartitionDir(base, defaultPartitionName, "pi" -> 2))
+
+      load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString,
2)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message