spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbt...@apache.org
Subject spark git commit: [SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields
Date Wed, 12 Sep 2018 17:44:11 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 071babbab -> 4c1428fa2


[SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields

## What changes were proposed in this pull request?

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
https://github.com/apache/spark/pull/21320#issuecomment-419290197

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such
expression, we don't really use the nested fields of this root field, so we can ignore the
unnecessary nested fields.

## How was this patch tested?

Unit tests.

Closes #22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(cherry picked from commit 3030b82c89d3e45a2e361c469fbc667a1e43b854)
Signed-off-by: DB Tsai <d_tsai@apple.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c1428fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c1428fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c1428fa

Branch: refs/heads/branch-2.4
Commit: 4c1428fa2b29c371458977427561d2b4bb9daa5b
Parents: 071babb
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Wed Sep 12 17:43:40 2018 +0000
Committer: DB Tsai <d_tsai@apple.com>
Committed: Wed Sep 12 17:44:02 2018 +0000

----------------------------------------------------------------------
 .../parquet/ParquetSchemaPruning.scala          | 34 +++++++--
 .../parquet/ParquetSchemaPruningSuite.scala     | 77 +++++++++++++++++---
 2 files changed, 96 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c1428fa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
index 6a46b5f..91080b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
     val projectionRootFields = projects.flatMap(getRootFields)
     val filterRootFields = filters.flatMap(getRootFields)
 
-    (projectionRootFields ++ filterRootFields).distinct
+    // Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`.
+    // For them, if there are any nested fields accessed in the query, we don't need to add
root
+    // field access of above expressions.
+    // For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
+    // we don't need to read nested fields of `name` struct other than `first` field.
+    val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
+      .distinct.partition(_.contentAccessed)
+
+    optRootFields.filter { opt =>
+      !rootFields.exists(_.field.name == opt.field.name)
+    } ++ rootFields
   }
 
   /**
@@ -156,7 +166,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
     // in the resulting schema may differ from their ordering in the logical relation's
     // original schema
     val mergedSchema = requestedRootFields
-      .map { case RootField(field, _) => StructType(Array(field)) }
+      .map { case root: RootField => StructType(Array(root.field)) }
       .reduceLeft(_ merge _)
     val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
     val mergedDataSchema =
@@ -199,6 +209,15 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
       case att: Attribute =>
         RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true)
:: Nil
       case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil
+      // Root field accesses by `IsNotNull` and `IsNull` are special cases as the expressions
+      // don't actually use any nested fields. These root field accesses might be excluded
later
+      // if there are any nested fields accesses in the query plan.
+      case IsNotNull(SelectedField(field)) =>
+        RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+      case IsNull(SelectedField(field)) =>
+        RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+      case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
+        expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
       case _ =>
         expr.children.flatMap(getRootFields)
     }
@@ -250,8 +269,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
     }
 
   /**
-   * A "root" schema field (aka top-level, no-parent) and whether it was derived from
-   * an attribute or had a proper child.
+   * This represents a "root" schema field (aka top-level, no-parent). `field` is the
+   * `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
+   * was derived from an attribute or had a proper child. `contentAccessed` means whether
+   * it was accessed with its content by the expressions refer it.
    */
-  private case class RootField(field: StructField, derivedFromAtt: Boolean)
+  private case class RootField(field: StructField, derivedFromAtt: Boolean,
+    contentAccessed: Boolean = true)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c1428fa/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index eb99654..7b132af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -35,22 +35,29 @@ class ParquetSchemaPruningSuite
     with SchemaPruningTest
     with SharedSQLContext {
   case class FullName(first: String, middle: String, last: String)
+  case class Company(name: String, address: String)
+  case class Employer(id: Int, company: Company)
   case class Contact(
     id: Int,
     name: FullName,
     address: String,
     pets: Int,
     friends: Array[FullName] = Array.empty,
-    relatives: Map[String, FullName] = Map.empty)
+    relatives: Map[String, FullName] = Map.empty,
+    employer: Employer = null)
 
   val janeDoe = FullName("Jane", "X.", "Doe")
   val johnDoe = FullName("John", "Y.", "Doe")
   val susanSmith = FullName("Susan", "Z.", "Smith")
 
+  val employer = Employer(0, Company("abc", "123 Business Street"))
+  val employerWithNullCompany = Employer(1, null)
+
   private val contacts =
     Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
-      relatives = Map("brother" -> johnDoe)) ::
-    Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) ::
Nil
+      relatives = Map("brother" -> johnDoe), employer = employer) ::
+    Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe),
+      employer = employerWithNullCompany) :: Nil
 
   case class Name(first: String, last: String)
   case class BriefContact(id: Int, name: Name, address: String)
@@ -66,13 +73,14 @@ class ParquetSchemaPruningSuite
     pets: Int,
     friends: Array[FullName] = Array(),
     relatives: Map[String, FullName] = Map(),
+    employer: Employer = null,
     p: Int)
 
   case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p:
Int)
 
   private val contactsWithDataPartitionColumn =
-    contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
-      ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) }
+    contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
+      ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer,
1) }
   private val briefContactsWithDataPartitionColumn =
     briefContacts.map { case BriefContact(id, name, address) =>
       BriefContactWithDataPartitionColumn(id, name, address, 2) }
@@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite
       Row(null) :: Row(null) :: Nil)
   }
 
+  testSchemaPruning("select a single complex field and in where clause") {
+    val query1 = sql("select name.first from contacts where name.first = 'Jane'")
+    checkScan(query1, "struct<name:struct<first:string>>")
+    checkAnswer(query1, Row("Jane") :: Nil)
+
+    val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
+    checkScan(query2, "struct<name:struct<first:string,last:string>>")
+    checkAnswer(query2, Row("Jane", "Doe") :: Nil)
+
+    val query3 = sql("select name.first from contacts " +
+      "where employer.company.name = 'abc' and p = 1")
+    checkScan(query3, "struct<name:struct<first:string>," +
+      "employer:struct<company:struct<name:string>>>")
+    checkAnswer(query3, Row("Jane") :: Nil)
+
+    val query4 = sql("select name.first, employer.company.name from contacts " +
+      "where employer.company is not null and p = 1")
+    checkScan(query4, "struct<name:struct<first:string>," +
+      "employer:struct<company:struct<name:string>>>")
+    checkAnswer(query4, Row("Jane", "abc") :: Nil)
+  }
+
+  testSchemaPruning("select nullable complex field and having is not null predicate") {
+    val query = sql("select employer.company from contacts " +
+      "where employer is not null and p = 1")
+    checkScan(query, "struct<employer:struct<company:struct<name:string,address:string>>>")
+    checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil)
+  }
+
+  testSchemaPruning("select a single complex field and is null expression in project") {
+    val query = sql("select name.first, address is not null from contacts")
+    checkScan(query, "struct<name:struct<first:string>,address:string>")
+    checkAnswer(query.orderBy("id"),
+      Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) ::
Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and in clause") {
+    val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
+    checkScan(query,
+      "struct<friends:array<struct<first:string,middle:string>>>")
+    checkAnswer(query.orderBy("id"),
+      Row(Array("Z.")) :: Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and in clause") {
+    val query =
+      sql("select relatives[\"brother\"].middle from contacts " +
+        "where relatives[\"brother\"].first = 'John'")
+    checkScan(query,
+      "struct<relatives:map<string,struct<first:string,middle:string>>>")
+    checkAnswer(query.orderBy("id"),
+      Row("Y.") :: Nil)
+  }
+
   private def testSchemaPruning(testName: String)(testThunk: => Unit) {
     withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
       test(s"Spark vectorized reader - without partition data column - $testName") {
@@ -238,10 +300,7 @@ class ParquetSchemaPruningSuite
 
   testMixedCasePruning("filter with different-case column names") {
     val query = sql("select id from mixedcase where Col2.b = 2")
-    // Pruning with filters is currently unsupported. As-is, the file reader will read the
id column
-    // and the entire coL2 struct. Once pruning with filters has been implemented we can
uncomment
-    // this line
-    // checkScan(query, "struct<id:int,coL2:struct<B:int>>")
+    checkScan(query, "struct<id:int,coL2:struct<B:int>>")
     checkAnswer(query.orderBy("id"), Row(1) :: Nil)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message