spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13070][SQL] Better error message when Parquet schema merging fails
Date Sun, 31 Jan 2016 07:02:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master de2837199 -> a1303de0a


[SPARK-13070][SQL] Better error message when Parquet schema merging fails

Make sure we throw better error messages when Parquet schema merging fails.

Author: Cheng Lian <lian@databricks.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #10979 from viirya/schema-merging-failure-message.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1303de0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1303de0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1303de0

Branch: refs/heads/master
Commit: a1303de0a0e9d0c80327977abf52a79e2aa95e1f
Parents: de28371
Author: Cheng Lian <lian@databricks.com>
Authored: Sat Jan 30 23:02:49 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sat Jan 30 23:02:49 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/StructType.scala |  6 ++--
 .../datasources/parquet/ParquetRelation.scala   | 33 +++++++++++++++++---
 .../parquet/ParquetFilterSuite.scala            | 15 +++++++++
 .../parquet/ParquetSchemaSuite.scala            | 30 ++++++++++++++++++
 4 files changed, 77 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1303de0/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index da0c928..c9e7e7f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -424,13 +424,13 @@ object StructType extends AbstractDataType {
         if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) {
           DecimalType(leftPrecision, leftScale)
         } else if ((leftPrecision != rightPrecision) && (leftScale != rightScale))
{
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale")
         } else if (leftPrecision != rightPrecision) {
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"precision $leftPrecision and $rightPrecision")
         } else {
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"scala $leftScale and $rightScale")
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1303de0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index f875900..1e686d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -800,12 +800,37 @@ private[sql] object ParquetRelation extends Logging {
               assumeInt96IsTimestamp = assumeInt96IsTimestamp,
               writeLegacyParquetFormat = writeLegacyParquetFormat)
 
-          footers.map { footer =>
-            ParquetRelation.readSchemaFromFooter(footer, converter)
-          }.reduceLeftOption(_ merge _).iterator
+          if (footers.isEmpty) {
+            Iterator.empty
+          } else {
+            var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
+            footers.tail.foreach { footer =>
+              val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
+              try {
+                mergedSchema = mergedSchema.merge(schema)
+              } catch { case cause: SparkException =>
+                throw new SparkException(
+                  s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}",
cause)
+              }
+            }
+            Iterator.single(mergedSchema)
+          }
         }.collect()
 
-    partiallyMergedSchemas.reduceLeftOption(_ merge _)
+    if (partiallyMergedSchemas.isEmpty) {
+      None
+    } else {
+      var finalSchema = partiallyMergedSchemas.head
+      partiallyMergedSchemas.tail.foreach { schema =>
+        try {
+          finalSchema = finalSchema.merge(schema)
+        } catch { case cause: SparkException =>
+          throw new SparkException(
+            s"Failed merging schema:\n${schema.treeString}", cause)
+        }
+      }
+      Some(finalSchema)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a1303de0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 1796b3a..3ded32c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -421,6 +421,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         // We will remove the temporary metadata when writing Parquet file.
         val forPathSix = sqlContext.read.parquet(pathSix).schema
         assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+        // sanity test: make sure optional metadata field is not wrongly set.
+        val pathSeven = s"${dir.getCanonicalPath}/table7"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+        val pathEight = s"${dir.getCanonicalPath}/table8"
+        (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+        val df2 = sqlContext.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a",
"b")
+        checkAnswer(
+          df2,
+          Row(1, "1"))
+
+        // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+        assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+        assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1303de0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 60fa81b..d860651 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.parquet.schema.MessageTypeParser
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -449,6 +450,35 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }.getMessage.contains("detected conflicting schemas"))
   }
 
+  test("schema merging failure error message") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext.range(3).write.parquet(s"$path/p=1")
+      sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+      val message = intercept[SparkException] {
+        sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+      }.getMessage
+
+      assert(message.contains("Failed merging schema of file"))
+    }
+
+    // test for second merging (after read Parquet schema in parallel done)
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext.range(3).write.parquet(s"$path/p=1")
+      sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+      sqlContext.sparkContext.conf.set("spark.default.parallelism", "20")
+
+      val message = intercept[SparkException] {
+        sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+      }.getMessage
+
+      assert(message.contains("Failed merging schema:"))
+    }
+  }
+
   // =======================================================
   // Tests for converting Parquet LIST to Catalyst ArrayType
   // =======================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message