spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild
Date Thu, 17 Mar 2016 01:20:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master ca9ef86c8 -> 917f4000b


[SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild

## What changes were proposed in this pull request?

This https://github.com/apache/spark/pull/2400 added the support to parse JSON rows wrapped
with an array. However, this throws an exception when the given data contains array data and
struct data in the same field as below:

```json
{"a": {"b": 1}}
{"a": []}
```

and the schema is given as below:

```scala
val schema =
  StructType(
    StructField("a", StructType(
      StructField("b", StringType) :: Nil
    )) :: Nil)
```

- **Before**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```scala
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure:
Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10,
192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData
cannot be cast to org.apache.spark.sql.catalyst.InternalRow
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50)
	at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
Source)
...
```

- **After**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```bash
+----+
|   a|
+----+
| [1]|
|null|
+----+
```

For other data types, in this case it converts the given values are `null` but only this case
emits an exception.

This PR makes the support for wrapped rows applied only at the top level.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for code style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #11752 from HyukjinKwon/SPARK-3308-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/917f4000
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/917f4000
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/917f4000

Branch: refs/heads/master
Commit: 917f4000b418ee0997551d4dfabb369c4e9243a9
Parents: ca9ef86
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Wed Mar 16 18:20:30 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Wed Mar 16 18:20:30 2016 -0700

----------------------------------------------------------------------
 .../datasources/json/JSONRelation.scala         |  1 -
 .../datasources/json/JacksonParser.scala        | 37 +++++++++++++-------
 .../execution/datasources/json/JsonSuite.scala  | 19 +++++++++-
 .../datasources/json/TestJsonData.scala         |  5 +++
 4 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3fa5ebf..751d78d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.CharArrayWriter
 
 import com.fasterxml.jackson.core.JsonFactory
-import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b2f5c1e..3252b6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -49,8 +49,31 @@ object JacksonParser {
 
   /**
    * Parse the current token (and related children) according to a desired schema
+   * This is an wrapper for the method `convertField()` to handle a row wrapped
+   * with an array.
    */
-  def convertField(
+  def convertRootField(
+        factory: JsonFactory,
+        parser: JsonParser,
+        schema: DataType): Any = {
+    import com.fasterxml.jackson.core.JsonToken._
+    (parser.getCurrentToken, schema) match {
+      case (START_ARRAY, st: StructType) =>
+        // SPARK-3308: support reading top level JSON arrays and take every element
+        // in such an array as a row
+        convertArray(factory, parser, st)
+
+      case (START_OBJECT, ArrayType(st, _)) =>
+        // the business end of SPARK-3308:
+        // when an object is found but an array is requested just wrap it in a list
+        convertField(factory, parser, st) :: Nil
+
+      case _ =>
+        convertField(factory, parser, schema)
+    }
+  }
+
+  private def convertField(
       factory: JsonFactory,
       parser: JsonParser,
       schema: DataType): Any = {
@@ -157,19 +180,9 @@ object JacksonParser {
       case (START_OBJECT, st: StructType) =>
         convertObject(factory, parser, st)
 
-      case (START_ARRAY, st: StructType) =>
-        // SPARK-3308: support reading top level JSON arrays and take every element
-        // in such an array as a row
-        convertArray(factory, parser, st)
-
       case (START_ARRAY, ArrayType(st, _)) =>
         convertArray(factory, parser, st)
 
-      case (START_OBJECT, ArrayType(st, _)) =>
-        // the business end of SPARK-3308:
-        // when an object is found but an array is requested just wrap it in a list
-        convertField(factory, parser, st) :: Nil
-
       case (START_OBJECT, MapType(StringType, kt, _)) =>
         convertMap(factory, parser, kt)
 
@@ -264,7 +277,7 @@ object JacksonParser {
           Utils.tryWithResource(factory.createParser(record)) { parser =>
             parser.nextToken()
 
-            convertField(factory, parser, schema) match {
+            convertRootField(factory, parser, schema) match {
               case null => failedRecord(record)
               case row: InternalRow => row :: Nil
               case array: ArrayData =>

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4a8c128..6d942c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData
{
 
       Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
         parser.nextToken()
-        JacksonParser.convertField(factory, parser, dataType)
+        JacksonParser.convertRootField(factory, parser, dataType)
       }
     }
 
@@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData
{
     }
   }
 
+  test("Parse JSON rows having an array type and a struct type in the same field.") {
+    withTempDir { dir =>
+      val dir = Utils.createTempDir()
+      dir.delete()
+      val path = dir.getCanonicalPath
+      arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+
+      val schema =
+        StructType(
+          StructField("a", StructType(
+            StructField("b", StringType) :: Nil
+          )) :: Nil)
+      val jsonDF = sqlContext.read.schema(schema).json(path)
+      assert(jsonDF.count() == 2)
+    }
+  }
+
   test("SPARK-12872 Support to specify the option for compression codec") {
     withTempDir { dir =>
       val dir = Utils.createTempDir()

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index a083605..b2eff81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -209,6 +209,11 @@ private[json] trait TestJsonData {
     sqlContext.sparkContext.parallelize(
       """{"ts":1451732645}""" :: Nil)
 
+  def arrayAndStructRecords: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      """{"a": {"b": 1}}""" ::
+      """{"a": []}""" :: Nil)
+
   lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" ::
Nil)
 
   def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message