spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API
Date Fri, 18 Dec 2015 18:55:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d2f71c27c -> afffe24c0


[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Data Source filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND expressions partially.

Author: Yin Huai <yhuai@databricks.com>

Closes #10362 from yhuai/SPARK-12218.

(cherry picked from commit 41ee7c57abd9f52065fd7ffb71a8af229603371d)
Signed-off-by: Yin Huai <yhuai@databricks.com>

Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afffe24c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afffe24c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afffe24c

Branch: refs/heads/branch-1.5
Commit: afffe24c00f49c199383bd07530266a8124d77f2
Parents: d2f71c2
Author: Yin Huai <yhuai@databricks.com>
Authored: Fri Dec 18 10:52:14 2015 -0800
Committer: Yin Huai <yhuai@databricks.com>
Committed: Fri Dec 18 10:55:21 2015 -0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    | 12 ++++++++++-
 .../parquet/ParquetFilterSuite.scala            | 19 +++++++++++++++++
 .../apache/spark/sql/hive/orc/OrcFilters.scala  | 22 +++++++++-----------
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 20 ++++++++++++++++++
 4 files changed, 60 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 1f0405f..3f7a409 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -264,7 +264,17 @@ private[sql] object ParquetFilters {
         makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
       case sources.And(lhs, rhs) =>
-        (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
+        // At here, it is not safe to just convert one side if we do not understand the
+        // other side. Here is an example used to explain the reason.
+        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
+        // convert b in ('1'). If we only convert a = 2, we will end up with a filter
+        // NOT(a = 2), which will generate wrong results.
+        // Pushing one side of AND down is only safe to do at the top level.
+        // You can see ParquetRelation's initializeLocalJobFunc method as an example.
+        for {
+          lhsFilter <- createFilter(schema, lhs)
+          rhsFilter <- createFilter(schema, rhs)
+        } yield FilterApi.and(lhsFilter, rhsFilter)
 
       case sources.Or(lhs, rhs) =>
         for {

http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f88ddc7..05d305a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -336,4 +336,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table1"
+        (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path)
+
+        checkAnswer(
+          sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+        checkAnswer(
+          sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 27193f5..ebfb175 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging {
 
     expression match {
       case And(left, right) =>
-        val tryLeft = buildSearchArgument(left, newBuilder)
-        val tryRight = buildSearchArgument(right, newBuilder)
-
-        val conjunction = for {
-          _ <- tryLeft
-          _ <- tryRight
+        // At here, it is not safe to just convert one side if we do not understand the
+        // other side. Here is an example used to explain the reason.
+        // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
+        // convert b in ('1'). If we only convert a = 2, we will end up with a filter
+        // NOT(a = 2), which will generate wrong results.
+        // Pushing one side of AND down is only safe to do at the top level.
+        // You can see ParquetRelation's initializeLocalJobFunc method as an example.
+        for {
+          _ <- buildSearchArgument(left, newBuilder)
+          _ <- buildSearchArgument(right, newBuilder)
           lhs <- buildSearchArgument(left, builder.startAnd())
           rhs <- buildSearchArgument(right, lhs)
         } yield rhs.end()
 
-        // For filter `left AND right`, we can still push down `left` even if `right` is
not
-        // convertible, and vice versa.
-        conjunction
-          .orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder)))
-          .orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder)))
-
       case Or(left, right) =>
         for {
           _ <- buildSearchArgument(left, newBuilder)

http://git-wip-us.apache.org/repos/asf/spark/blob/afffe24c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 593e689..6bcc644 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{Row, SQLConf}
 import org.apache.spark.sql.sources.HadoopFsRelationTest
 import org.apache.spark.sql.types._
 
@@ -61,4 +62,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
           "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load())
     }
   }
+
+  test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/table1"
+        (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)
+
+        checkAnswer(
+          sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+
+        checkAnswer(
+          sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"),
+          (1 to 5).map(i => Row(i, (i % 2).toString)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message