spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-23173][SQL] Avoid creating corrupt parquet files when loading data from JSON
Date Fri, 09 Mar 2018 22:29:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 3ec25d5a8 -> b083bd107


[SPARK-23173][SQL] Avoid creating corrupt parquet files when loading data from JSON

## What changes were proposed in this pull request?

The from_json() function accepts an additional parameter, where the user might specify the
schema. The issue is that the specified schema might not be compatible with data. In particular,
the JSON data might be missing data for fields declared as non-nullable in the schema. The
from_json() function does not verify the data against such errors. When data with missing
fields is sent to the parquet encoder, there is no verification either. The end results is
a corrupt parquet file.

To avoid corruptions, make sure that all fields in the user-specified schema are set to be
nullable.
Since this changes the behavior of a public function, we need to include it in release notes.
The behavior can be reverted by setting `spark.sql.fromJsonForceNullableSchema=false`

## How was this patch tested?

Added two new tests.

Author: Michał Świtakowski <michal.switakowski@databricks.com>

Closes #20694 from mswit-databricks/SPARK-23173.

(cherry picked from commit 2ca9bb083c515511d2bfee271fc3e0269aceb9d5)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b083bd10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b083bd10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b083bd10

Branch: refs/heads/branch-2.3
Commit: b083bd107d25bd3f7a4cdcf3aafa07b9895878b6
Parents: 3ec25d5
Author: Michał Świtakowski <michal.switakowski@databricks.com>
Authored: Fri Mar 9 14:29:31 2018 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Fri Mar 9 14:29:49 2018 -0800

----------------------------------------------------------------------
 .../catalyst/expressions/jsonExpressions.scala  | 22 ++++++++------
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 ++++++
 .../expressions/JsonExpressionsSuite.scala      | 30 +++++++++++++++++++-
 .../datasources/parquet/ParquetIOSuite.scala    | 19 +++++++++++++
 4 files changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 18b4fed..fdd672c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException,
FailFastMode, GenericArrayData, MapData}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -515,10 +516,15 @@ case class JsonToStructs(
     child: Expression,
     timeZoneId: Option[String] = None)
   extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
{
-  override def nullable: Boolean = true
 
-  def this(schema: DataType, options: Map[String, String], child: Expression) =
-    this(schema, options, child, None)
+  val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
+
+  // The JSON input data might be missing certain fields. We force the nullability
+  // of the user-provided schema to avoid data corruptions. In particular, the parquet-mr
encoder
+  // can generate incorrect files if values are missing in columns declared as non-nullable.
+  val nullableSchema = if (forceNullableSchema) schema.asNullable else schema
+
+  override def nullable: Boolean = true
 
   // Used in `FunctionRegistry`
   def this(child: Expression, schema: Expression) =
@@ -535,22 +541,22 @@ case class JsonToStructs(
       child = child,
       timeZoneId = None)
 
-  override def checkInputDataTypes(): TypeCheckResult = schema match {
+  override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | ArrayType(_: StructType, _) =>
       super.checkInputDataTypes()
     case _ => TypeCheckResult.TypeCheckFailure(
-      s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+      s"Input schema ${nullableSchema.simpleString} must be a struct or an array of structs.")
   }
 
   @transient
-  lazy val rowSchema = schema match {
+  lazy val rowSchema = nullableSchema match {
     case st: StructType => st
     case ArrayType(st: StructType, _) => st
   }
 
   // This converts parsed rows to the desired output by the given schema.
   @transient
-  lazy val converter = schema match {
+  lazy val converter = nullableSchema match {
     case _: StructType =>
       (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
     case ArrayType(_: StructType, _) =>
@@ -563,7 +569,7 @@ case class JsonToStructs(
       rowSchema,
       new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
 
-  override def dataType: DataType = schema
+  override def dataType: DataType = nullableSchema
 
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
     copy(timeZoneId = Option(timeZoneId))

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 47fcf34..eebb4c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -481,6 +481,14 @@ object SQLConf {
     .stringConf
     .createWithDefault("_corrupt_record")
 
+  val FROM_JSON_FORCE_NULLABLE_SCHEMA = buildConf("spark.sql.fromJsonForceNullableSchema")
+    .internal()
+    .doc("When true, force the output schema of the from_json() function to be nullable "
+
+      "(including all the fields). Otherwise, the schema might not be compatible with" +
+      "actual data, which leads to curruptions.")
+    .booleanConf
+    .createWithDefault(true)
+
   val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout")
     .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
     .timeConf(TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index a0bbe02..7812319 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -22,11 +22,13 @@ import java.util.Calendar
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeTestUtils, DateTimeUtils,
GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
+class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with PlanTestBase
{
   val json =
     """
       |{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],
@@ -680,4 +682,30 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
{
       )
     }
   }
+
+  test("from_json missing fields") {
+    for (forceJsonNullableSchema <- Seq(false, true)) {
+      withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString)
{
+        val input =
+          """{
+            |  "a": 1,
+            |  "c": "foo"
+            |}
+            |""".stripMargin
+        val jsonSchema = new StructType()
+          .add("a", LongType, nullable = false)
+          .add("b", StringType, nullable = false)
+          .add("c", StringType, nullable = false)
+        val output = InternalRow(1L, null, UTF8String.fromString("foo"))
+        checkEvaluation(
+          JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId),
+          output
+        )
+        val schema = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType),
gmtId)
+          .dataType
+        val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
+        assert(schemaToCompare == schema)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b083bd10/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f3ece5b..e4e0e6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -771,6 +772,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext
{
       assert(option.compressionCodecClassName == "UNCOMPRESSED")
     }
   }
+
+  test("SPARK-23173 Writing a file with data converted from JSON with and incorrect user
schema") {
+    withTempPath { file =>
+      val jsonData =
+        """{
+        |  "a": 1,
+        |  "c": "foo"
+        |}
+        |""".stripMargin
+      val jsonSchema = new StructType()
+        .add("a", LongType, nullable = false)
+        .add("b", StringType, nullable = false)
+        .add("c", StringType, nullable = false)
+      spark.range(1).select(from_json(lit(jsonData), jsonSchema) as "input")
+        .write.parquet(file.getAbsolutePath)
+      checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, null, "foo"))))
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message