spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viirya <...@git.apache.org>
Subject [GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...
Date Fri, 19 Oct 2018 00:52:26 GMT
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22749#discussion_r226506718
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
---
    @@ -103,75 +88,61 @@ object ExpressionEncoder {
        * name/positional binding is preserved.
        */
       def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
    +    if (encoders.length > 22) {
    +      throw new RuntimeException("Can't construct a tuple encoder for more than 22 encoders.")
    +    }
    +
         encoders.foreach(_.assertUnresolved())
     
         val schema = StructType(encoders.zipWithIndex.map {
           case (e, i) =>
    -        val (dataType, nullable) = if (e.flat) {
    -          e.schema.head.dataType -> e.schema.head.nullable
    -        } else {
    -          e.schema -> true
    -        }
    -        StructField(s"_${i + 1}", dataType, nullable)
    +        StructField(s"_${i + 1}", e.objSerializer.dataType, e.objSerializer.nullable)
         })
     
         val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
     
    -    val serializer = encoders.zipWithIndex.map { case (enc, index) =>
    -      val originalInputObject = enc.serializer.head.collect { case b: BoundReference
=> b }.head
    +    val serializers = encoders.zipWithIndex.map { case (enc, index) =>
    +      val boundRefs = enc.objSerializer.collect { case b: BoundReference => b }.distinct
    +      assert(boundRefs.size == 1, "object serializer should have only one bound reference
but " +
    +        s"there are ${boundRefs.size}")
    +
    +      val originalInputObject = boundRefs.head
           val newInputObject = Invoke(
             BoundReference(0, ObjectType(cls), nullable = true),
             s"_${index + 1}",
    -        originalInputObject.dataType)
    +        originalInputObject.dataType,
    +        returnNullable = originalInputObject.nullable)
     
    -      val newSerializer = enc.serializer.map(_.transformUp {
    +      val newSerializer = enc.objSerializer.transformUp {
             case b: BoundReference if b == originalInputObject => newInputObject
    --- End diff --
    
    yes, right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message