spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-19987][SQL] Pass all filters into FileIndex
Date Fri, 17 Mar 2017 01:32:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4c3200546 -> 8537c00e0


[SPARK-19987][SQL] Pass all filters into FileIndex

## What changes were proposed in this pull request?
This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex
can have a more global view on predicates.

## How was this patch tested?
Change should be covered by existing test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #17322 from rxin/SPARK-19987.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8537c00e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8537c00e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8537c00e

Branch: refs/heads/master
Commit: 8537c00e0a17eff2a8c6745fbdd1d08873c0434d
Parents: 4c32005
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Mar 16 18:31:57 2017 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Thu Mar 16 18:31:57 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 23 ++++++++++++--------
 .../execution/OptimizeMetadataOnlyQuery.scala   |  2 +-
 .../datasources/CatalogFileIndex.scala          |  5 +++--
 .../sql/execution/datasources/FileIndex.scala   | 15 ++++++++-----
 .../datasources/FileSourceStrategy.scala        |  5 +----
 .../PartitioningAwareFileIndex.scala            |  8 ++++---
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  4 +---
 7 files changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8ebad67..bfe9c8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
@@ -135,7 +135,7 @@ case class RowDataSourceScanExec(
  * @param output Output attributes of the scan.
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
- * @param dataFilters Data source filters to use for filtering data within partitions.
+ * @param dataFilters Filters on non-partition columns.
  * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
@@ -143,7 +143,7 @@ case class FileSourceScanExec(
     output: Seq[Attribute],
     outputSchema: StructType,
     partitionFilters: Seq[Expression],
-    dataFilters: Seq[Filter],
+    dataFilters: Seq[Expression],
     override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
@@ -156,7 +156,8 @@ case class FileSourceScanExec(
     false
   }
 
-  @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+  @transient private lazy val selectedPartitions =
+    relation.location.listFiles(partitionFilters, dataFilters)
 
   override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -225,6 +226,10 @@ case class FileSourceScanExec(
     }
   }
 
+  @transient
+  private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
+  logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
+
   // These metadata values make scan plans uniquely identifiable for equality checking.
   override val metadata: Map[String, String] = {
     def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
@@ -237,7 +242,7 @@ case class FileSourceScanExec(
         "ReadSchema" -> outputSchema.catalogString,
         "Batched" -> supportsBatch.toString,
         "PartitionFilters" -> seqToString(partitionFilters),
-        "PushedFilters" -> seqToString(dataFilters),
+        "PushedFilters" -> seqToString(pushedDownFilters),
         "Location" -> locationDesc)
     val withOptPartitionCount =
       relation.partitionSchemaOption.map { _ =>
@@ -255,7 +260,7 @@ case class FileSourceScanExec(
         dataSchema = relation.dataSchema,
         partitionSchema = relation.partitionSchema,
         requiredSchema = outputSchema,
-        filters = dataFilters,
+        filters = pushedDownFilters,
         options = relation.options,
         hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 769deb1..3c046ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery(
         relation match {
           case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
             val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-            val partitionData = fsRelation.location.listFiles(filters = Nil)
+            val partitionData = fsRelation.location.listFiles(Nil, Nil)
             LocalRelation(partAttrs, partitionData.map(_.values))
 
           case relation: CatalogRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index d6c4b97..db0254f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -54,8 +54,9 @@ class CatalogFileIndex(
 
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
-    filterPartitions(filters).listFiles(Nil)
+  override def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
= {
+    filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
   }
 
   override def refresh(): Unit = fileStatusCache.invalidateAll()

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 277223d..6b99d38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -46,12 +46,17 @@ trait FileIndex {
    * Returns all valid files grouped into partitions when the data is partitioned. If the
data is
    * unpartitioned, this will return a single partition with no partition values.
    *
-   * @param filters The filters used to prune which partitions are returned.  These filters
must
-   *                only refer to partition columns and this method will only return files
-   *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
-   *                filters will not need to be evaluated again on the returned data.
+   * @param partitionFilters The filters used to prune which partitions are returned. These
filters
+   *                         must only refer to partition columns and this method will only
return
+   *                         files where these predicates are guaranteed to evaluate to `true`.
+   *                         Thus, these filters will not need to be evaluated again on the
+   *                         returned data.
+   * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation
+   *                    does not need to guarantee these filters are applied, i.e. the execution
+   *                    engine will ensure these filters are still applied on the returned
files.
    */
-  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+  def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
 
   /**
    * Returns the list of files that will be read when scanning this relation. This call may
be

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 26e1380..17f7e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging {
       val outputSchema = readDataColumns.toStructType
       logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
 
-      val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
-      logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
-
       val outputAttributes = readDataColumns ++ partitionColumns
 
       val scan =
@@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging {
           outputAttributes,
           outputSchema,
           partitionKeyFilters.toSeq,
-          pushedDownFilters,
+          dataFilters,
           table.map(_.identifier))
 
       val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index db8bbc5..71500a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex(
 
   override def partitionSchema: StructType = partitionSpec().partitionColumns
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+  protected val hadoopConf: Configuration =
+    sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
 
   protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
 
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+  override def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
= {
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath)))
:: Nil
     } else {
-      prunePartitions(filters, partitionSpec()).map {
+      prunePartitions(partitionFilters, partitionSpec()).map {
         case PartitionPath(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
             case Some(existingDir) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8537c00e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9f0d1ce..2e060ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.net.URI
-
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.Striped
@@ -248,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
         .inferSchema(
           sparkSession,
           options,
-          fileIndex.listFiles(Nil).flatMap(_.files))
+          fileIndex.listFiles(Nil, Nil).flatMap(_.files))
         .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
 
       inferredSchema match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message