spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-22592][SQL] cleanup filter converting for hive
Date Thu, 23 Nov 2017 23:33:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master 42f83d7c4 -> c1217565e


[SPARK-22592][SQL] cleanup filter converting for hive

## What changes were proposed in this pull request?

We have 2 different methods to convert filters for hive, regarding a config. This introduces
duplicated and inconsistent code(e.g. one use helper objects for pattern match and one doesn't).

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19801 from cloud-fan/cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1217565
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1217565
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1217565

Branch: refs/heads/master
Commit: c1217565e20bd3297f3b1bc8f18f5dea933211c0
Parents: 42f83d7
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Nov 23 15:33:26 2017 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Thu Nov 23 15:33:26 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/client/HiveShim.scala | 144 +++++++++----------
 1 file changed, 69 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1217565/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index bd1b300..1eac70d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -585,53 +585,17 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
    * Unsupported predicates are skipped.
    */
   def convertFilters(table: Table, filters: Seq[Expression]): String = {
-    if (SQLConf.get.advancedPartitionPredicatePushdownEnabled) {
-      convertComplexFilters(table, filters)
-    } else {
-      convertBasicFilters(table, filters)
-    }
-  }
-
-
-  /**
-   * An extractor that matches all binary comparison operators except null-safe equality.
-   *
-   * Null-safe equality is not supported by Hive metastore partition predicate pushdown
-   */
-  object SpecialBinaryComparison {
-    def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match {
-      case _: EqualNullSafe => None
-      case _ => Some((e.left, e.right))
+    /**
+     * An extractor that matches all binary comparison operators except null-safe equality.
+     *
+     * Null-safe equality is not supported by Hive metastore partition predicate pushdown
+     */
+    object SpecialBinaryComparison {
+      def unapply(e: BinaryComparison): Option[(Expression, Expression)] = e match {
+        case _: EqualNullSafe => None
+        case _ => Some((e.left, e.right))
+      }
     }
-  }
-
-  private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = {
-    // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
-    lazy val varcharKeys = table.getPartitionKeys.asScala
-      .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
-        col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
-      .map(col => col.getName).toSet
-
-    filters.collect {
-      case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
-        s"${a.name} ${op.symbol} $v"
-      case op @ SpecialBinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
-        s"$v ${op.symbol} ${a.name}"
-      case op @ SpecialBinaryComparison(a: Attribute, Literal(v, _: StringType))
-        if !varcharKeys.contains(a.name) =>
-        s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}"""
-      case op @ SpecialBinaryComparison(Literal(v, _: StringType), a: Attribute)
-        if !varcharKeys.contains(a.name) =>
-        s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}"""
-    }.mkString(" and ")
-  }
-
-  private def convertComplexFilters(table: Table, filters: Seq[Expression]): String = {
-    // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
-    lazy val varcharKeys = table.getPartitionKeys.asScala
-      .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
-        col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
-      .map(col => col.getName).toSet
 
     object ExtractableLiteral {
       def unapply(expr: Expression): Option[String] = expr match {
@@ -643,9 +607,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 
     object ExtractableLiterals {
       def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
-        exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) {
-          case (Some(accum), Some(value)) => Some(accum :+ value)
-          case _ => None
+        val extractables = exprs.map(ExtractableLiteral.unapply)
+        if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
+          Some(extractables.map(_.get))
+        } else {
+          None
         }
       }
     }
@@ -660,40 +626,68 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       }
 
       def unapply(values: Set[Any]): Option[Seq[String]] = {
-        values.toSeq.foldLeft(Option(Seq.empty[String])) {
-          case (Some(accum), value) if valueToLiteralString.isDefinedAt(value) =>
-            Some(accum :+ valueToLiteralString(value))
-          case _ => None
+        val extractables = values.toSeq.map(valueToLiteralString.lift)
+        if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
+          Some(extractables.map(_.get))
+        } else {
+          None
         }
       }
     }
 
-    def convertInToOr(a: Attribute, values: Seq[String]): String = {
-      values.map(value => s"${a.name} = $value").mkString("(", " or ", ")")
+    object NonVarcharAttribute {
+      // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
+      private val varcharKeys = table.getPartitionKeys.asScala
+        .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
+          col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
+        .map(col => col.getName).toSet
+
+      def unapply(attr: Attribute): Option[String] = {
+        if (varcharKeys.contains(attr.name)) {
+          None
+        } else {
+          Some(attr.name)
+        }
+      }
+    }
+
+    def convertInToOr(name: String, values: Seq[String]): String = {
+      values.map(value => s"$name = $value").mkString("(", " or ", ")")
     }
 
-    lazy val convert: PartialFunction[Expression, String] = {
-      case In(a: Attribute, ExtractableLiterals(values))
-          if !varcharKeys.contains(a.name) && values.nonEmpty =>
-        convertInToOr(a, values)
-      case InSet(a: Attribute, ExtractableValues(values))
-          if !varcharKeys.contains(a.name) && values.nonEmpty =>
-        convertInToOr(a, values)
-      case op @ SpecialBinaryComparison(a: Attribute, ExtractableLiteral(value))
-          if !varcharKeys.contains(a.name) =>
-        s"${a.name} ${op.symbol} $value"
-      case op @ SpecialBinaryComparison(ExtractableLiteral(value), a: Attribute)
-          if !varcharKeys.contains(a.name) =>
-        s"$value ${op.symbol} ${a.name}"
-      case And(expr1, expr2)
-          if convert.isDefinedAt(expr1) || convert.isDefinedAt(expr2) =>
-        (convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " and ", ")")
-      case Or(expr1, expr2)
-          if convert.isDefinedAt(expr1) && convert.isDefinedAt(expr2) =>
-        s"(${convert(expr1)} or ${convert(expr2)})"
+    val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
+
+    def convert(expr: Expression): Option[String] = expr match {
+      case In(NonVarcharAttribute(name), ExtractableLiterals(values)) if useAdvanced =>
+        Some(convertInToOr(name, values))
+
+      case InSet(NonVarcharAttribute(name), ExtractableValues(values)) if useAdvanced =>
+        Some(convertInToOr(name, values))
+
+      case op @ SpecialBinaryComparison(NonVarcharAttribute(name), ExtractableLiteral(value))
=>
+        Some(s"$name ${op.symbol} $value")
+
+      case op @ SpecialBinaryComparison(ExtractableLiteral(value), NonVarcharAttribute(name))
=>
+        Some(s"$value ${op.symbol} $name")
+
+      case And(expr1, expr2) if useAdvanced =>
+        val converted = convert(expr1) ++ convert(expr2)
+        if (converted.isEmpty) {
+          None
+        } else {
+          Some(converted.mkString("(", " and ", ")"))
+        }
+
+      case Or(expr1, expr2) if useAdvanced =>
+        for {
+          left <- convert(expr1)
+          right <- convert(expr2)
+        } yield s"($left or $right)"
+
+      case _ => None
     }
 
-    filters.map(convert.lift).collect { case Some(filterString) => filterString }.mkString("
and ")
+    filters.flatMap(convert).mkString(" and ")
   }
 
   private def quoteStringLiteral(str: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message