spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-15916][SQL] JDBC filter push down should respect operator precedence
Date Sat, 18 Jun 2016 00:11:53 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ca0802fd5 -> b22b20db6


[SPARK-15916][SQL] JDBC filter push down should respect operator precedence

## What changes were proposed in this pull request?

This PR fixes the problem that the precedence order is messed when pushing where-clause expression
to JDBC layer.

**Case 1:**

For sql `select * from table where (a or b) and c`, the where-clause is wrongly converted
to JDBC where-clause `a or (b and c)` after filter push down. The consequence is that JDBC
may returns less or more rows than expected.

**Case 2:**

For sql `select * from table where always_false_condition`, the result table may not be empty
if the JDBC RDD is partitioned using where-clause:
```
spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", "partition 2 where
clause"...)
```

## How was this patch tested?

Unit test.

This PR also close #13640

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Sean Zhong <seanzhong@databricks.com>

Closes #13743 from clockfly/SPARK-15916.

(cherry picked from commit ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b22b20db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b22b20db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b22b20db

Branch: refs/heads/branch-2.0
Commit: b22b20db640e9fac20c5d54cc83964dc74393821
Parents: ca0802f
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Fri Jun 17 17:11:38 2016 -0700
Committer: Cheng Lian <lian@databricks.com>
Committed: Fri Jun 17 17:11:50 2016 -0700

----------------------------------------------------------------------
 .../execution/datasources/jdbc/JDBCRDD.scala    |  4 +--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 26 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b22b20db/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8d0906e..44cfbb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -305,14 +305,14 @@ private[sql] class JDBCRDD(
    * `filters`, but as a WHERE clause suitable for injection into a SQL query.
    */
   private val filterWhereClause: String =
-    filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ")
+    filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
 
   /**
    * A WHERE clause representing both `filters`, if any, and the current partition.
    */
   private def getWhereClause(part: JDBCPartition): String = {
     if (part.whereClause != null && filterWhereClause.length > 0) {
-      "WHERE " + filterWhereClause + " AND " + part.whereClause
+      "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
     } else if (part.whereClause != null) {
       "WHERE " + part.whereClause
     } else if (filterWhereClause.length > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b22b20db/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index abb7918..d6ec40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite
     assert(oracleDialect.getJDBCType(StringType).
       map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
   }
+
+  private def assertEmptyQuery(sqlString: String): Unit = {
+    assert(sql(sqlString).collect().isEmpty)
+  }
+
+  test("SPARK-15916: JDBC filter operator push down should respect operator precedence")
{
+    val TRUE = "NAME != 'non_exists'"
+    val FALSE1 = "THEID > 1000000000"
+    val FALSE2 = "THEID < -1000000000"
+
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND $FALSE2")
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR $TRUE)")
+
+    // Tests JDBCPartition whereClause clause push down.
+    withTempTable("tempFrame") {
+      val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
+      val df = spark.read.jdbc(
+        urlWithUserAndPass,
+        "TEST.PEOPLE",
+        predicates = Array[String](jdbcPartitionWhereClause),
+        new Properties)
+
+      df.createOrReplaceTempView("tempFrame")
+      assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message