spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
Date Wed, 14 Dec 2016 19:29:15 GMT
Repository: spark
Updated Branches:
  refs/heads/master 169b9d73e -> 89ae26dcd


[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for
FileFormat datasources

## What changes were proposed in this pull request?

Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)`
and removes it at the post-filter in Spark-side.

For example, the codes below:

```scala
val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
df.filter($"_1" === "true").explain(true)
```

shows it keeps `null` properly.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- LocalRelation [_1#17]

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#17 as double) = cast(true as double))
+- LocalRelation [_1#17]

== Optimized Logical Plan ==
Filter (isnotnull(_1#17) && null)
+- LocalRelation [_1#17]

== Physical Plan ==
*Filter (isnotnull(_1#17) && null)       << Here `null` is there
+- LocalTableScan [_1#17]
```

However, when we read it back from Parquet,

```scala
val path = "/tmp/testfile"
df.write.parquet(path)
spark.read.parquet(path).filter($"_1" === "true").explain(true)
```

`null` is removed at the post-filter.

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#11] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#11 as double) = cast(true as double))
+- Relation[_1#11] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#11) && null)
+- Relation[_1#11] parquet

== Physical Plan ==
*Project [_1#11]
+- *Filter isnotnull(_1#11)       << Here `null` is missing
   +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile],
PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

This PR fixes it to keep it properly. In more details,

```scala
val partitionKeyFilters =
  ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
```

This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children`
and `references` is being empty  which is always the subset of `partitionSet`.

And then in

```scala
val afterScanFilters = filterSet -- partitionKeyFilters
```

`null` is always removed from the post filter. So, if the referenced fields are empty, it
should be applied into data columns too.

After this PR, it becomes as below:

```
== Parsed Logical Plan ==
'Filter ('_1 = true)
+- Relation[_1#276] parquet

== Analyzed Logical Plan ==
_1: boolean
Filter (cast(_1#276 as double) = cast(true as double))
+- Relation[_1#276] parquet

== Optimized Logical Plan ==
Filter (isnotnull(_1#276) && null)
+- Relation[_1#276] parquet

== Physical Plan ==
*Project [_1#276]
+- *Filter (isnotnull(_1#276) && null)
   +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b...,
PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
```

## How was this patch tested?

Unit test in `FileSourceStrategySuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16184 from HyukjinKwon/SPARK-18753.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89ae26dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89ae26dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89ae26dc

Branch: refs/heads/master
Commit: 89ae26dcdb73266fbc3a8b6da9f5dff30dc4ec95
Parents: 169b9d7
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Wed Dec 14 11:29:11 2016 -0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Dec 14 11:29:11 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/FileSourceStrategy.scala   |  2 +-
 .../execution/datasources/FileSourceStrategySuite.scala  | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89ae26dc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 55ca4f1..ead3233 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -86,7 +86,7 @@ object FileSourceStrategy extends Strategy with Logging {
       val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
 
       // Predicates with both partition keys and attributes need to be evaluated after the
scan.
-      val afterScanFilters = filterSet -- partitionKeyFilters
+      val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
       logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
 
       val filterAttributes = AttributeSet(afterScanFilters)

http://git-wip-us.apache.org/repos/asf/spark/blob/89ae26dc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index d900ce7..f361628 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext
with Predi
     }
   }
 
+  test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side post-filter")
{
+    val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS()
+    withTempPath { p =>
+      val path = p.getAbsolutePath
+      ds.write.parquet(path)
+      val readBack = spark.read.parquet(path).filter($"_1" === "true")
+      val filtered = ds.filter($"_1" === "true").toDF()
+      checkAnswer(readBack, filtered)
+    }
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message