spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20331][SQL][FOLLOW-UP] Add a SQLConf for enhanced Hive partition pruning predicate pushdown
Date Sat, 21 Oct 2017 17:05:51 GMT
Repository: spark
Updated Branches:
  refs/heads/master d9f286d26 -> d8cada8d1


[SPARK-20331][SQL][FOLLOW-UP] Add a SQLConf for enhanced Hive partition pruning predicate
pushdown

## What changes were proposed in this pull request?
This is a follow-up PR of https://github.com/apache/spark/pull/17633.

This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which
can be used to turn the enhancement off.

## How was this patch tested?
Add a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19547 from gatorsmile/Spark20331FollowUp.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8cada8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8cada8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8cada8d

Branch: refs/heads/master
Commit: d8cada8d1d3fce979a4bc1f9879593206722a3b9
Parents: d9f286d
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Sat Oct 21 10:05:45 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Sat Oct 21 10:05:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++++
 .../apache/spark/sql/hive/client/HiveShim.scala | 29 +++++++++++++++++++
 .../spark/sql/hive/client/FiltersSuite.scala    | 30 ++++++++++++++++----
 3 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8cada8d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 618d4a0..4cfe53b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -173,6 +173,13 @@ object SQLConf {
     .intConf
     .createWithDefault(4)
 
+  val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
+    buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
+      .internal()
+      .doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
+      .booleanConf
+      .createWithDefault(true)
+
   val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
     buildConf("spark.sql.statistics.fallBackToHdfs")
     .doc("If the table statistics are not available from table metadata enable fall back
to hdfs." +
@@ -1092,6 +1099,9 @@ class SQLConf extends Serializable with Logging {
 
   def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
 
+  def advancedPartitionPredicatePushdownEnabled: Boolean =
+    getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)
+
   def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
 
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

http://git-wip-us.apache.org/repos/asf/spark/blob/d8cada8d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index cde20da..5c1ff2b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -585,6 +585,35 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
    * Unsupported predicates are skipped.
    */
   def convertFilters(table: Table, filters: Seq[Expression]): String = {
+    if (SQLConf.get.advancedPartitionPredicatePushdownEnabled) {
+      convertComplexFilters(table, filters)
+    } else {
+      convertBasicFilters(table, filters)
+    }
+  }
+
+  private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = {
+    // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
+    lazy val varcharKeys = table.getPartitionKeys.asScala
+      .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
+        col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
+      .map(col => col.getName).toSet
+
+    filters.collect {
+      case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
+        s"${a.name} ${op.symbol} $v"
+      case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
+        s"$v ${op.symbol} ${a.name}"
+      case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
+        if !varcharKeys.contains(a.name) =>
+        s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}"""
+      case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
+        if !varcharKeys.contains(a.name) =>
+        s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}"""
+    }.mkString(" and ")
+  }
+
+  private def convertComplexFilters(table: Table, filters: Seq[Expression]): String = {
     // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
     lazy val varcharKeys = table.getPartitionKeys.asScala
       .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||

http://git-wip-us.apache.org/repos/asf/spark/blob/d8cada8d/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index 031c1a5..1976569 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -26,13 +26,15 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
  * A set of tests for the filter conversion logic used when pushing partition pruning into
the
  * metastore
  */
-class FiltersSuite extends SparkFunSuite with Logging {
+class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
   private val shim = new Shim_v0_13
 
   private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
@@ -72,10 +74,28 @@ class FiltersSuite extends SparkFunSuite with Logging {
 
   private def filterTest(name: String, filters: Seq[Expression], result: String) = {
     test(name) {
-      val converted = shim.convertFilters(testTable, filters)
-      if (converted != result) {
-        fail(
-          s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
+      withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true") {
+        val converted = shim.convertFilters(testTable, filters)
+        if (converted != result) {
+          fail(s"Expected ${filters.mkString(",")} to convert to '$result' but got '$converted'")
+        }
+      }
+    }
+  }
+
+  test("turn on/off ADVANCED_PARTITION_PREDICATE_PUSHDOWN") {
+    import org.apache.spark.sql.catalyst.dsl.expressions._
+    Seq(true, false).foreach { enabled =>
+      withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> enabled.toString)
{
+        val filters =
+          (Literal(1) === a("intcol", IntegerType) ||
+            Literal(2) === a("intcol", IntegerType)) :: Nil
+        val converted = shim.convertFilters(testTable, filters)
+        if (enabled) {
+          assert(converted == "(1 = intcol or 2 = intcol)")
+        } else {
+          assert(converted.isEmpty)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message