spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kiszk <...@git.apache.org>
Subject [GitHub] spark pull request #18747: [SPARK-20822][SQL] Generate code to directly get ...
Date Fri, 13 Oct 2017 17:11:39 GMT
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18747#discussion_r144609180
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
---
    @@ -23,21 +23,37 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.QueryPlan
     import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
    -import org.apache.spark.sql.execution.LeafExecNode
    -import org.apache.spark.sql.execution.metric.SQLMetrics
    +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode}
    +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
     import org.apache.spark.sql.types.UserDefinedType
     
     
     case class InMemoryTableScanExec(
         attributes: Seq[Attribute],
         predicates: Seq[Expression],
         @transient relation: InMemoryRelation)
    -  extends LeafExecNode {
    +  extends LeafExecNode with ColumnarBatchScan {
     
       override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
     
    -  override lazy val metrics = Map(
    -    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
    +  override def vectorTypes: Option[Seq[String]] =
    +    Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
    +
    +  override val columnIndexes =
    +    attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray
    +
    +  override val supportCodegen: Boolean = relation.useColumnarBatches
    +
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    if (supportCodegen) {
    +      val buffers = relation.cachedColumnBuffers
    +      // HACK ALERT: This is actually an RDD[CachedBatch].
    +      // We're taking advantage of Scala's type erasure here to pass these batches along.
    +      Seq(buffers.asInstanceOf[RDD[InternalRow]])
    --- End diff --
    
    Yes, I break that assumption ([`RDD[CachedBatch]`](https://github.com/apache/spark/pull/18747/files#diff-2654ea5e4013b9ad5f43d64b68413225R115))
since we have to create `ColumnarBatch` when it will be read.
    Should we convert `CachedBatch` to `ColumnarBatch` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message