spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r...
Date Thu, 05 Feb 2015 03:18:48 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 478ee3f91 -> aa6f4cae0


[SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r...

...aised in SPARK-4520.

The exception is thrown only for a thrift generated parquet file. The array element schema
name is assumed as "array" as per ParquetAvro but for thrift generated parquet files, it is
array_name + "_tuple". This leads to missing child of array group type and hence when the
parquet rows are being materialized leads to the exception.

Author: Sadhan Sood <sadhan@tellapart.com>

Closes #4148 from sadhan/SPARK-4520 and squashes the following commits:

c5ccde8 [Sadhan Sood] [SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException
as raised in SPARK-4520.

(cherry picked from commit dba98bf6987ec39380f1a5b0ca2772b694452231)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa6f4cae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa6f4cae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa6f4cae

Branch: refs/heads/branch-1.3
Commit: aa6f4cae0c8213df108c06f55a51bd4b2ac1747f
Parents: 478ee3f
Author: Sadhan Sood <sadhan@tellapart.com>
Authored: Wed Feb 4 19:18:06 2015 -0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Feb 4 19:18:41 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetConverter.scala    |  5 +++
 .../spark/sql/parquet/ParquetTableSupport.scala |  6 +++-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 35 ++++++++++++++------
 .../spark/sql/parquet/ParquetSchemaSuite.scala  | 28 ++++++++++++++--
 4 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa6f4cae/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index d87ddfe..7d62f37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -66,6 +66,11 @@ private[sql] object CatalystConverter {
   // Using a different value will result in Parquet silently dropping columns.
   val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
   val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+  // SPARK-4520: Thrift generated parquet files have different array element
+  // schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
+  // as opposed to "array" used by default. For more information, check
+  // TestThriftSchemaConverter.java in parquet.thrift.
+  val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
   val MAP_KEY_SCHEMA_NAME = "key"
   val MAP_VALUE_SCHEMA_NAME = "value"
   val MAP_SCHEMA_NAME = "map"

http://git-wip-us.apache.org/repos/asf/spark/blob/aa6f4cae/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 3fb1cc4..14c81ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -99,7 +99,11 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging
{
     val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
 
     if (requestedAttributes != null) {
-      parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
+      // If the parquet file is thrift derived, there is a good chance that
+      // it will have the thrift class in metadata.
+      val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
+      parquetSchema = ParquetTypesConverter
+        .convertFromAttributes(requestedAttributes, isThriftDerived)
       metadata.put(
         RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
         ParquetTypesConverter.convertToString(requestedAttributes))

http://git-wip-us.apache.org/repos/asf/spark/blob/aa6f4cae/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index f1d4ff2..b646109 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -285,13 +285,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
       ctype: DataType,
       name: String,
       nullable: Boolean = true,
-      inArray: Boolean = false): ParquetType = {
+      inArray: Boolean = false, 
+      toThriftSchemaNames: Boolean = false): ParquetType = {
     val repetition =
       if (inArray) {
         Repetition.REPEATED
       } else {
         if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
       }
+    val arraySchemaName = if (toThriftSchemaNames) {
+      name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
+    } else {
+      CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
+    }
     val typeInfo = fromPrimitiveDataType(ctype)
     typeInfo.map {
       case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) =>
@@ -306,22 +312,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }.getOrElse {
       ctype match {
         case udt: UserDefinedType[_] => {
-          fromDataType(udt.sqlType, name, nullable, inArray)
+          fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames)
         }
         case ArrayType(elementType, false) => {
           val parquetElementType = fromDataType(
             elementType,
-            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            arraySchemaName,
             nullable = false,
-            inArray = true)
+            inArray = true,
+            toThriftSchemaNames)
           ConversionPatterns.listType(repetition, name, parquetElementType)
         }
         case ArrayType(elementType, true) => {
           val parquetElementType = fromDataType(
             elementType,
-            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            arraySchemaName,
             nullable = true,
-            inArray = false)
+            inArray = false,
+            toThriftSchemaNames)
           ConversionPatterns.listType(
             repetition,
             name,
@@ -332,7 +340,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
         }
         case StructType(structFields) => {
           val fields = structFields.map {
-            field => fromDataType(field.dataType, field.name, field.nullable, inArray
= false)
+            field => fromDataType(field.dataType, field.name, field.nullable, 
+                                  inArray = false, toThriftSchemaNames)
           }
           new ParquetGroupType(repetition, name, fields.toSeq)
         }
@@ -342,13 +351,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
               keyType,
               CatalystConverter.MAP_KEY_SCHEMA_NAME,
               nullable = false,
-              inArray = false)
+              inArray = false,
+              toThriftSchemaNames)
           val parquetValueType =
             fromDataType(
               valueType,
               CatalystConverter.MAP_VALUE_SCHEMA_NAME,
               nullable = valueContainsNull,
-              inArray = false)
+              inArray = false,
+              toThriftSchemaNames)
           ConversionPatterns.mapType(
             repetition,
             name,
@@ -374,10 +385,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
             field.getRepetition != Repetition.REQUIRED)())
   }
 
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+  def convertFromAttributes(attributes: Seq[Attribute],
+                            toThriftSchemaNames: Boolean = false): MessageType = {
     val fields = attributes.map(
       attribute =>
-        fromDataType(attribute.dataType, attribute.name, attribute.nullable))
+        fromDataType(attribute.dataType, attribute.name, attribute.nullable,
+                     toThriftSchemaNames = toThriftSchemaNames))
     new MessageType("root", fields)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aa6f4cae/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 6427495..5f7f31d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -33,9 +33,10 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
    * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
    */
   private def testSchema[T <: Product: ClassTag: TypeTag](
-      testName: String, messageType: String): Unit = {
+      testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
     test(testName) {
-      val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
+      val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T],

+                                                               isThriftDerived)
       val expected = MessageTypeParser.parseMessageType(messageType)
       actual.checkContains(expected)
       expected.checkContains(actual)
@@ -146,6 +147,29 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       |}
     """.stripMargin)
 
+  // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
+  // as expected from attributes
+  testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
+    "thrift generated parquet schema",
+    """
+      |message root {
+      |  optional binary _1 (UTF8);
+      |  optional binary _2 (UTF8);
+      |  optional binary _3 (UTF8);
+      |  optional group _4 (LIST) {
+      |    repeated int32 _4_tuple;
+      |  }
+      |  optional group _5 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required binary key (UTF8);
+      |      optional group value (LIST) {
+      |        repeated int32 value_tuple;
+      |      }
+      |    }
+      |  }
+      |}
+    """.stripMargin, isThriftDerived = true)
+
   test("DataType string parser compatibility") {
     // This is the generated string from previous versions of the Spark SQL, using the following:
     // val schema = StructType(List(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message