spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ryan Blue (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16344) Array of struct with a single field name "element" can't be decoded from Parquet files written by Spark 1.6+
Date Wed, 06 Jul 2016 20:54:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365062#comment-15365062
] 

Ryan Blue commented on SPARK-16344:
-----------------------------------

It looks like the main change is to specifically catch the 3-level name structure, {{list-name
(LIST) -> "list" -> "element"}}. The problem with this approach is that it doesn't solve
the problem entirely either.

Let me try to give a bit more background. In parquet-avro, there are two {{isElementType}}
methods; one in the schema converter and one in the record converter. The one in the schema
converter will guess whether the Parquet type uses a 3-level list or a 2-level list when it
can't be determined according to the spec's backward-compatibility rules. That guess assumes
a 2-level structure by default and at the next major release will guess a 3-level structure.
(This can be controlled by a property.) But this is only used when the reader doesn't supply
a read schema / expected schema and the code has to convert from Parquet's type to get one.
Ideally, we always have a read schema from the file, from the reader's expected class (if
using Java objects), or from the reader passing in the expected schema. That's why the other
{{isElementType}} method exists: it looks at the expected schema and the file schema to determine
whether the caller has passed in a schema with the extra single-field list/element struct.

That code has to distinguish between two cases for a 3-level list:
1. When the caller expects {{List<OneTuple<ElementType>>}}, with the extra record
layer that was originally returned when Avro only knew about 2-level lists.
2. When the caller expects {{List<ElementType>}}, without an extra layer.

The code currently assumes that if the element schema appears to match the repeated type that
the caller has passed a schema indicating case 1. This issue points out that the matching
isn't perfect and an element with a single field named "element" will incorrectly match case
1 when it was really case 2. The problem with the solution in PR #14013, if it were applied
to Avro, is that it breaks if the caller is actually passing a schema for case 1.

I'm not sure whether Spark works like Avro and has two {{isElementType}} methods. If Spark
can guarantee that the table schema is never case 1, then it is correct to use the logic in
the PR. I don't think that's always the case because the table schema may come from user objects
in a Dataset or from the Hive MetaStore. But, this may be a reasonable heuristic if you think
case 2 is far more common than case 1. For parquet-avro, I think the user supplying a single-field
record with the inner field named "element" is rare enough that it doesn't really matter,
but it's up to you guys in the Spark community on this issue.

One last thing: based on the rest of the schema structure, there should be only one way to
match the expected schema to the file schema. You could always try both and fall back to the
other case, or have a more complicated {{isElementType}} method that recurses down the sub-trees
to find a match. I didn't implement this in parquet-avro because I think it's a rare problem
and not worth the time.

> Array of struct with a single field name "element" can't be decoded from Parquet files
written by Spark 1.6+
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16344
>                 URL: https://issues.apache.org/jira/browse/SPARK-16344
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>
> This is a weird corner case. Users may hit this issue if they have a schema that
> # has an array field whose element type is a struct, and
> # the struct has one and only one field, and
> # that field is named as "element".
> The following Spark shell snippet for Spark 1.6 reproduces this bug:
> {code}
> case class A(element: Long)
> case class B(f: Array[A])
> val path = "/tmp/silly.parquet"
> Seq(B(Array(A(42)))).toDF("f0").write.mode("overwrite").parquet(path)
> val df = sqlContext.read.parquet(path)
> df.printSchema()
> // root
> //  |-- f0: array (nullable = true)
> //  |    |-- element: struct (containsNull = true)
> //  |    |    |-- element: long (nullable = true)
> df.show()
> {code}
> Exception thrown:
> {noformat}
> org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in
file file:/tmp/silly.parquet/part-r-00007-e06db7b0-5181-4a14-9fee-5bb452e883a0.gz.parquet
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>         at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>         at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: Expected instance of group converter but got
"org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter"
>         at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:37)
>         at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:266)
>         at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
>         at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
>         at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
>         at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>         at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
>         ... 26 more
> {noformat}
> Spark 2.0.0-SNAPSHOT and Spark master also suffer this issue. To reproduce it using these
versions, just replace {{sqlContext}} in the above snippet with {{spark}}.
> The reason behind is related to Parquet backwards-compatibility rules for LIST types
defined in [parquet-format spec|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists].
> The Spark SQL schema shown above
> {noformat}
> root
>  |-- f0: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- element: long (nullable = true)
> {noformat}
> is equivalent to the following SQL type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> According to the parquet-format spec, the standard layout of a LIST-like structure is
a 3-level layout:
> {noformat}
> <list-repetition> group <name> (LIST) {
>   repeated group list {
>     <element-repetition> <element-type> element;
>   }
> }
> {noformat}
> Thus, the standard representation of the aforementioned SQL type should be:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {    (1)
>         optional int64 element;   (2)
>       }
>     }
>   }
> }
> {noformat}
> Note that the two "element" fields are different:
> - The {{group}} field "element" at (1) is a "container" of list element type. This is
defined as part of the parquet-format spec.
> - The {{int64}} field "element" at (2) corresponds to the {{element}} field of case class
{{A}} we defined above.
> However, due to historical reasons, various existing systems do not conform to the parquet-format
spec and may write LIST structures in a non-standard layout. For example, parquet-avro and
parquet-thrift use a 2-level layout like
> {noformat}
> // parquet-avro style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> array;
> }
> // parquet-thrift style
> <list-repetition> group <name> (LIST) {
>   repeated <element-type> <name>_tuple;
> }
> {noformat}
> To keep backwards-compatibility, the parquet-format spec defined a set of [backwards-compatibility
rules|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules]
to also recognize these patterns.
> Unfortunately, these backwards-compatibility rules makes the Parquet schema we mentioned
above ambiguous:
> {noformat}
> message root {
>   optional group f (LIST) {
>     repeated group list {
>       optional group element {
>         optional int64 element;
>       }
>     }
>   }
> }
> {noformat}
> When interpreted using the standard 3-level layout, it is the expected type:
> {noformat}
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: BIGINT>
>   >
> >
> {noformat}
> When interpreted using the legacy 2-level layout, it is the unexpected type
> {noformat}
> // When interpreted as legacy 2-level layout
> STRUCT<
>   f: ARRAY<
>     STRUCT<element: STRUCT<element: BIGINT>>
>   >
> >
> {noformat}
> This is because the nested struct field name happens to be "element", which is used as
a dedicated name of the element type "container" group in the standard 3-level layout, and
lead to the ambiguity.
> Currently, Spark 1.6.x, 2.0.0-SNAPSHOT, and master chose the 2nd one. We can fix this
issue by giving the standard 3-level layout a higher priority when trying to match schema
patterns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message