spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhovell <...@git.apache.org>
Subject [GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...
Date Fri, 09 Mar 2018 11:53:15 GMT
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20771#discussion_r173431326
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
---
    @@ -599,8 +610,79 @@ case class MapObjects private(
     
       override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil
     
    -  override def eval(input: InternalRow): Any =
    -    throw new UnsupportedOperationException("Only code-generated evaluation is supported")
    +  // The data with UserDefinedType are actually stored with the data type of its sqlType.
    +  // When we want to apply MapObjects on it, we have to use it.
    +  lazy private val inputDataType = inputData.dataType match {
    +    case u: UserDefinedType[_] => u.sqlType
    +    case _ => inputData.dataType
    +  }
    +
    +  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
    +    inputCollection.map { element =>
    +      val row = InternalRow.fromSeq(Seq(element))
    +      lambdaFunction.eval(row)
    +    }
    +  }
    +
    +  // Executes lambda function on input collection.
    +  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
    +    case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
    +      x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
    +    case ObjectType(cls) if cls.isArray =>
    +      x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
    +    case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
    +      x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
    +    case ObjectType(cls) if cls == classOf[Object] =>
    +      (inputCollection) => {
    +        if (inputCollection.getClass.isArray) {
    +          executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
    +        } else {
    +          executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
    +        }
    +      }
    +    case ArrayType(et, _) =>
    +      x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
    +  }
    +
    +  // Converts the processed collection to custom collection class if any.
    +  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
    +    case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
    +      // Scala sequence
    +      _.toSeq
    +    case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
    +      // Scala set
    +      _.toSet
    +    case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
    +      // Java list
    +      if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]]
||
    +          cls == classOf[java.util.AbstractSequentialList[_]]) {
    +        _.asJava
    +      } else {
    +        (results) => {
    +          val builder = Try(cls.getConstructor(Integer.TYPE)).map { constructor =>
    --- End diff --
    
    Can you try to do the constructor lookup only once? The duplication that that will cause
is ok.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message