spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet
Date Wed, 06 Jul 2016 19:42:27 GMT
Repository: spark
Updated Branches:
  refs/heads/master 480357cc6 -> 4f8ceed59


[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are
the same in Parquet

## What changes were proposed in this pull request?

Currently, if there is a schema as below:

```
root
  |-- _1: struct (nullable = true)
  |    |-- _1: integer (nullable = true)
```

and if we execute the codes below:

```scala
df.filter("_1 IS NOT NULL").count()
```

This pushes down a filter although this filter is being applied to `StructType`.(If my understanding
is correct, Spark does not pushes down filters for those).

The reason is, `ParquetFilters.getFieldMap` produces results below:

```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```

and then it becomes a `Map`

```
(_1,IntegerType)
```

Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters
for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`.

So, Parquet filter2 produces incorrect results, for example, the codes below:

```
df.filter("_1 IS NOT NULL").count()
```

produces always 0.

This PR prevents this by not finding nested fields.

## How was this patch tested?

Unit test in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14067 from HyukjinKwon/SPARK-16371.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8ceed5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8ceed5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8ceed5

Branch: refs/heads/master
Commit: 4f8ceed59367319300e4bfa5b957c387be81ffa3
Parents: 480357c
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Wed Jul 6 12:42:16 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Jul 6 12:42:16 2016 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala       |  2 +-
 .../datasources/parquet/ParquetFilters.scala          |  5 ++++-
 .../datasources/parquet/ParquetFilterSuite.scala      | 14 ++++++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9833620..76d7f5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
     // We want to clear this temporary metadata from saving into Parquet file.
-    // This metadata is only useful for detecting optional columns when pushdowning filters.
+    // This metadata is only useful for detecting optional columns when pushing down filters.
     val dataSchemaToWrite = StructType.removeMetadata(
       StructType.metadataKeyForOptionalField,
       dataSchema).asInstanceOf[StructType]

http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7213a38..e0a113a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
    */
   private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match
{
     case StructType(fields) =>
+      // Here we don't flatten the fields in the nested schema but just look up through
+      // root fields. Currently, accessing to nested fields does not push down filters
+      // and it does not support to create filters for them.
       fields.filter { f =>
         !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
           !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-      }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType)
}
+      }.map(f => f.name -> f.dataType)
     case _ => Array.empty[(String, DataType)]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2a5666e..18a3128 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("Do not push down filters incorrectly when inner name and outer name are the same")
{
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
+      // Here the schema becomes as below:
+      //
+      // root
+      //  |-- _1: struct (nullable = true)
+      //  |    |-- _1: integer (nullable = true)
+      //
+      // The inner column name, `_1` and outer column name `_1` are the same.
+      // Obviously this should not push down filters because the outer column is struct.
+      assert(df.filter("_1 IS NOT NULL").count() === 4)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message