spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...
Date Tue, 12 Jun 2018 19:18:33 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194858392
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)
:: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column
pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different
column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when
the required
    +        // projection is more complicated than column pruning, base column pruning on
the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    > there's nothing forcing other data sources to implement the new trait ...
    
    hmmm, I'm a little confused here. All v2 data sources would have to apply pushdown twice
right? Or are you suggesting we should not migrate file-based data source to data source v2?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message