spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liancheng <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-10301] [SQL] Fixes schema merging for n...
Date Sat, 05 Sep 2015 09:16:29 GMT
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8509#discussion_r38809151
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
---
    @@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
       val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
     
       val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
    +
    +  /**
    +   * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't
exist
    +   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
    +   */
    +  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType
= {
    +    val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
    +    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
    +  }
    +
    +  private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
    +    catalystType match {
    +      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
    +        // Only clips array types with nested type as element type.
    +        clipParquetListType(parquetType.asGroupType(), t.elementType)
    +
    +      case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
    +        // Only clips map types with nested type as value type.
    +        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
    +
    +      case t: StructType =>
    +        clipParquetGroup(parquetType.asGroupType(), t)
    +
    +      case _ =>
    +        parquetType
    +    }
    +  }
    +
    +  /**
    +   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is not equivalent
to
    +   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but it's not
an
    +   * [[AtomicType]].
    +   */
    +  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
    +    dataType match {
    +      case _: ArrayType | _: MapType | _: StructType => false
    +      case _ => true
    +    }
    +  }
    +
    +  /**
    +   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]].  The
element type
    +   * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]],
or a
    +   * [[StructType]].
    +   */
    +  private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type
= {
    +    // Precondition of this method, should only be called for lists with nested element
types.
    +    assert(!isPrimitiveCatalystType(elementType))
    +
    +    // Unannotated repeated group should be interpreted as required list of required
element, so
    +    // list element type is just the group itself.  Clip it.
    +    if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED))
{
    +      clipParquetType(parquetList, elementType)
    +    } else {
    +      assert(
    +        parquetList.getOriginalType == OriginalType.LIST,
    +        "Invalid Parquet schema. " +
    +          "Original type of annotated Parquet lists must be LIST: " +
    +          parquetList.toString)
    +
    +      assert(
    +        parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
    +        "Invalid Parquet schema. " +
    +          "LIST-annotated group should only have exactly one repeated field: " +
    +          parquetList)
    +
    +      // Precondition of this method, should only be called for lists with nested element
types.
    +      assert(!parquetList.getType(0).isPrimitive)
    +
    +      val repeatedGroup = parquetList.getType(0).asGroupType()
    +
    +      // If the repeated field is a group with multiple fields, or the repeated field
is a group
    +      // with one field and is named either "array" or uses the LIST-annotated group's
name with
    +      // "_tuple" appended then the repeated type is the element type and elements are
required.
    +      // Build a new LIST-annotated group with clipped `repeatedGroup` as element type
and the
    +      // only field.
    +      if (
    +        repeatedGroup.getFieldCount > 1 ||
    +        repeatedGroup.getName == "array" ||
    +        repeatedGroup.getName == parquetList.getName + "_tuple"
    +      ) {
    +        Types
    +          .buildGroup(parquetList.getRepetition)
    +          .as(OriginalType.LIST)
    +          .addField(clipParquetType(repeatedGroup, elementType))
    +          .named(parquetList.getName)
    +      } else {
    +        // Otherwise, the repeated field's type is the element type with the repeated
field's
    +        // repetition.
    +        Types
    +          .buildGroup(parquetList.getRepetition)
    +          .as(OriginalType.LIST)
    +          .addField(
    +            Types
    +              .repeatedGroup()
    +              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
    +              .named(repeatedGroup.getName))
    +          .named(parquetList.getName)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]].  The
value type
    +   * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]],
or a
    +   * [[StructType]].  Note that key type of any [[MapType]] is always a primitive type.
    +   */
    +  private def clipParquetMapType(
    +      parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
    +    // Precondition of this method, should only be called for maps with nested value
types.
    +    assert(!isPrimitiveCatalystType(valueType))
    +
    +    val repeatedGroup = parquetMap.getType(0).asGroupType()
    +    val parquetKeyType = repeatedGroup.getType(0)
    +    val parquetValueType = repeatedGroup.getType(1)
    +
    +    val clippedRepeatedGroup =
    +      Types
    +        .repeatedGroup()
    +        .as(repeatedGroup.getOriginalType)
    +        .addField(parquetKeyType)
    --- End diff --
    
    We don't allow key type to be complex type in Spark SQL. This is consistent with Hive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message