Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDE18200B36 for ; Wed, 6 Jul 2016 21:42:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC694160A64; Wed, 6 Jul 2016 19:42:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DCC05160A55 for ; Wed, 6 Jul 2016 21:42:28 +0200 (CEST) Received: (qmail 93502 invoked by uid 500); 6 Jul 2016 19:42:28 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 93493 invoked by uid 99); 6 Jul 2016 19:42:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 19:42:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD13AE0B66; Wed, 6 Jul 2016 19:42:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet Date: Wed, 6 Jul 2016 19:42:27 +0000 (UTC) archived-at: Wed, 06 Jul 2016 19:42:29 -0000 Repository: spark Updated Branches: refs/heads/master 480357cc6 -> 4f8ceed59 [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet ## What changes were proposed in this pull request? Currently, if there is a schema as below: ``` root |-- _1: struct (nullable = true) | |-- _1: integer (nullable = true) ``` and if we execute the codes below: ```scala df.filter("_1 IS NOT NULL").count() ``` This pushes down a filter although this filter is being applied to `StructType`.(If my understanding is correct, Spark does not pushes down filters for those). The reason is, `ParquetFilters.getFieldMap` produces results below: ``` (_1,StructType(StructField(_1,IntegerType,true))) (_1,IntegerType) ``` and then it becomes a `Map` ``` (_1,IntegerType) ``` Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`. So, Parquet filter2 produces incorrect results, for example, the codes below: ``` df.filter("_1 IS NOT NULL").count() ``` produces always 0. This PR prevents this by not finding nested fields. ## How was this patch tested? Unit test in `ParquetFilterSuite`. Author: hyukjinkwon Closes #14067 from HyukjinKwon/SPARK-16371. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8ceed5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8ceed5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8ceed5 Branch: refs/heads/master Commit: 4f8ceed59367319300e4bfa5b957c387be81ffa3 Parents: 480357c Author: hyukjinkwon Authored: Wed Jul 6 12:42:16 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 12:42:16 2016 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 5 ++++- .../datasources/parquet/ParquetFilterSuite.scala | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9833620..76d7f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory( ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushdowning filters. + // This metadata is only useful for detecting optional columns when pushing down filters. val dataSchemaToWrite = StructType.removeMetadata( StructType.metadataKeyForOptionalField, dataSchema).asInstanceOf[StructType] http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7213a38..e0a113a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -185,10 +185,13 @@ private[sql] object ParquetFilters { */ private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. fields.filter { f => !f.metadata.contains(StructType.metadataKeyForOptionalField) || !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) } + }.map(f => f.name -> f.dataType) case _ => Array.empty[(String, DataType)] } http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 2a5666e..18a3128 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Do not push down filters incorrectly when inner name and outer name are the same") { + withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => + // Here the schema becomes as below: + // + // root + // |-- _1: struct (nullable = true) + // | |-- _1: integer (nullable = true) + // + // The inner column name, `_1` and outer column name `_1` are the same. + // Obviously this should not push down filters because the outer column is struct. + assert(df.filter("_1 IS NOT NULL").count() === 4) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org