spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet
Date Fri, 31 Aug 2018 11:24:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 515708d5f -> 8d9495a8f


[SPARK-25207][SQL] Case-insensitve field resolution for filter pushdown when reading Parquet

## What changes were proposed in this pull request?

Currently, filter pushdown will not work if Parquet schema and Hive metastore schema are in
different letter cases even spark.sql.caseSensitive is false.

Like the below case:
```scala
spark.sparkContext.hadoopConfiguration.setInt("parquet.block.size", 8 * 1024 * 1024)
spark.range(1, 40 * 1024 * 1024, 1, 1).sortWithinPartitions("id").write.parquet("/tmp/t")
sql("CREATE TABLE t (ID LONG) USING parquet LOCATION '/tmp/t'")
sql("select * from t where id < 100L").write.csv("/tmp/id")
```

Although filter "ID < 100L" is generated by Spark, it fails to pushdown into parquet actually,
Spark still does the full table scan when reading.
This PR provides a case-insensitive field resolution to make it work.

Before - "ID < 100L" fail to pushedown:
<img width="273" alt="screen shot 2018-08-23 at 10 08 26 pm" src="https://user-images.githubusercontent.com/2989575/44530558-40ef8b00-a721-11e8-8abc-7f97671590d3.png">
After - "ID < 100L" pushedown sucessfully:
<img width="267" alt="screen shot 2018-08-23 at 10 08 40 pm" src="https://user-images.githubusercontent.com/2989575/44530567-44831200-a721-11e8-8634-e9f664b33d39.png">

## How was this patch tested?

Added UTs.

Closes #22197 from yucai/SPARK-25207.

Authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d9495a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d9495a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d9495a8

Branch: refs/heads/master
Commit: 8d9495a8f1e64dbc42c3741f9bcbd4893ce3f0e9
Parents: 515708d
Author: yucai <yyu1@ebay.com>
Authored: Fri Aug 31 19:24:09 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Aug 31 19:24:09 2018 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala |   3 +-
 .../datasources/parquet/ParquetFilters.scala    |  90 ++++++++++-----
 .../parquet/ParquetFilterSuite.scala            | 115 ++++++++++++++++++-
 3 files changed, 179 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d7eb143..ea4f159 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -347,6 +347,7 @@ class ParquetFileFormat
     val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
     val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -372,7 +373,7 @@ class ParquetFileFormat
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = footerFileMetaData.getSchema
         val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
-          pushDownStringStartWith, pushDownInFilterThreshold)
+          pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates
can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a
`flatMap`

http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 58b4a76..0c286de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long
=> JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Timestamp}
+import java.util.Locale
 
 import scala.collection.JavaConverters.asScalaBufferConverter
 
@@ -31,7 +32,7 @@ import org.apache.parquet.schema.OriginalType._
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
 import org.apache.spark.unsafe.types.UTF8String
@@ -44,7 +45,18 @@ private[parquet] class ParquetFilters(
     pushDownTimestamp: Boolean,
     pushDownDecimal: Boolean,
     pushDownStartWith: Boolean,
-    pushDownInFilterThreshold: Int) {
+    pushDownInFilterThreshold: Int,
+    caseSensitive: Boolean) {
+
+  /**
+   * Holds a single field information stored in the underlying parquet file.
+   *
+   * @param fieldName field name in parquet file
+   * @param fieldType field type related info in parquet file
+   */
+  private case class ParquetField(
+      fieldName: String,
+      fieldType: ParquetSchemaType)
 
   private case class ParquetSchemaType(
       originalType: OriginalType,
@@ -350,25 +362,38 @@ private[parquet] class ParquetFilters(
   }
 
   /**
-   * Returns a map from name of the column to the data type, if predicate push down applies.
+   * Returns a map, which contains parquet field name and data type, if predicate push down
applies.
    */
-  private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType
match {
-    case m: MessageType =>
-      // Here we don't flatten the fields in the nested schema but just look up through
-      // root fields. Currently, accessing to nested fields does not push down filters
-      // and it does not support to create filters for them.
-      m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
-        f.getName -> ParquetSchemaType(
-          f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)
-      }.toMap
-    case _ => Map.empty[String, ParquetSchemaType]
+  private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = {
+    // Here we don't flatten the fields in the nested schema but just look up through
+    // root fields. Currently, accessing to nested fields does not push down filters
+    // and it does not support to create filters for them.
+    val primitiveFields =
+      dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
+        f.getName -> ParquetField(f.getName,
+          ParquetSchemaType(f.getOriginalType,
+            f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
+      }
+    if (caseSensitive) {
+      primitiveFields.toMap
+    } else {
+      // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
+      // mode, just skip pushdown for these fields, they will trigger Exception when reading,
+      // See: SPARK-25132.
+      val dedupPrimitiveFields =
+        primitiveFields
+          .groupBy(_._1.toLowerCase(Locale.ROOT))
+          .filter(_._2.size == 1)
+          .mapValues(_.head._2)
+      CaseInsensitiveMap(dedupPrimitiveFields)
+    }
   }
 
   /**
    * Converts data sources filters to Parquet filter predicates.
    */
   def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate]
= {
-    val nameToType = getFieldMap(schema)
+    val nameToParquetField = getFieldMap(schema)
 
     // Decimal type must make sure that filter value's scale matched the file.
     // If doesn't matched, which would cause data corruption.
@@ -381,7 +406,7 @@ private[parquet] class ParquetFilters(
     // Parquet's type in the given file should be matched to the value's type
     // in the pushed filter in order to push down the filter to Parquet.
     def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
-      value == null || (nameToType(name) match {
+      value == null || (nameToParquetField(name).fieldType match {
         case ParquetBooleanType => value.isInstanceOf[JBoolean]
         case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
         case ParquetLongType => value.isInstanceOf[JLong]
@@ -408,7 +433,7 @@ private[parquet] class ParquetFilters(
     // filters for the column having dots in the names. Thus, we do not push down such filters.
     // See SPARK-20364.
     def canMakeFilterOn(name: String, value: Any): Boolean = {
-      nameToType.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name,
value)
+      nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name,
value)
     }
 
     // NOTE:
@@ -428,29 +453,39 @@ private[parquet] class ParquetFilters(
 
     predicate match {
       case sources.IsNull(name) if canMakeFilterOn(name, null) =>
-        makeEq.lift(nameToType(name)).map(_(name, null))
+        makeEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, null))
       case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
-        makeNotEq.lift(nameToType(name)).map(_(name, null))
+        makeNotEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, null))
 
       case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
-        makeEq.lift(nameToType(name)).map(_(name, value))
+        makeEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
       case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
-        makeNotEq.lift(nameToType(name)).map(_(name, value))
+        makeNotEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
 
       case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
-        makeEq.lift(nameToType(name)).map(_(name, value))
+        makeEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
       case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value)
=>
-        makeNotEq.lift(nameToType(name)).map(_(name, value))
+        makeNotEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
 
       case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
-        makeLt.lift(nameToType(name)).map(_(name, value))
+        makeLt.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
       case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
-        makeLtEq.lift(nameToType(name)).map(_(name, value))
+        makeLtEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
 
       case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
-        makeGt.lift(nameToType(name)).map(_(name, value))
+        makeGt.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
       case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
-        makeGtEq.lift(nameToType(name)).map(_(name, value))
+        makeGtEq.lift(nameToParquetField(name).fieldType)
+          .map(_(nameToParquetField(name).fieldName, value))
 
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side if we do not understand the
@@ -477,7 +512,8 @@ private[parquet] class ParquetFilters(
       case sources.In(name, values) if canMakeFilterOn(name, values.head)
         && values.distinct.length <= pushDownInFilterThreshold =>
         values.distinct.flatMap { v =>
-          makeEq.lift(nameToType(name)).map(_(name, v))
+          makeEq.lift(nameToParquetField(name).fieldType)
+            .map(_(nameToParquetField(name).fieldName, v))
         }.reduceLeftOption(FilterApi.or)
 
       case sources.StringStartsWith(name, prefix)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d9495a8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index be4f498..7ebb750 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -25,6 +25,7 @@ import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate,
Operato
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
@@ -60,7 +61,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
   private lazy val parquetFilters =
     new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
       conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
-      conf.parquetFilterPushDownInFilterThreshold)
+      conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -1021,6 +1022,118 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet")
{
+    def createParquetFilter(caseSensitive: Boolean): ParquetFilters = {
+      new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
+        conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
+        conf.parquetFilterPushDownInFilterThreshold, caseSensitive)
+    }
+    val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true)
+    val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false)
+
+    def testCaseInsensitiveResolution(
+        schema: StructType,
+        expected: FilterPredicate,
+        filter: sources.Filter): Unit = {
+      val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
+
+      assertResult(Some(expected)) {
+        caseInsensitiveParquetFilters.createFilter(parquetSchema, filter)
+      }
+      assertResult(None) {
+        caseSensitiveParquetFilters.createFilter(parquetSchema, filter)
+      }
+    }
+
+    val schema = StructType(Seq(StructField("cint", IntegerType)))
+
+    testCaseInsensitiveResolution(
+      schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), sources.IsNull("CINT"))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]),
+      sources.IsNotNull("CINT"))
+
+    testCaseInsensitiveResolution(
+      schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualTo("CINT", 1000))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.notEq(intColumn("cint"), 1000: Integer),
+      sources.Not(sources.EqualTo("CINT", 1000)))
+
+    testCaseInsensitiveResolution(
+      schema, FilterApi.eq(intColumn("cint"), 1000: Integer), sources.EqualNullSafe("CINT",
1000))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.notEq(intColumn("cint"), 1000: Integer),
+      sources.Not(sources.EqualNullSafe("CINT", 1000)))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.lt(intColumn("cint"), 1000: Integer), sources.LessThan("CINT", 1000))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.ltEq(intColumn("cint"), 1000: Integer),
+      sources.LessThanOrEqual("CINT", 1000))
+
+    testCaseInsensitiveResolution(
+      schema, FilterApi.gt(intColumn("cint"), 1000: Integer), sources.GreaterThan("CINT",
1000))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.gtEq(intColumn("cint"), 1000: Integer),
+      sources.GreaterThanOrEqual("CINT", 1000))
+
+    testCaseInsensitiveResolution(
+      schema,
+      FilterApi.or(
+        FilterApi.eq(intColumn("cint"), 10: Integer),
+        FilterApi.eq(intColumn("cint"), 20: Integer)),
+      sources.In("CINT", Array(10, 20)))
+
+    val dupFieldSchema = StructType(
+      Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType)))
+    val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
+    assertResult(None) {
+      caseInsensitiveParquetFilters.createFilter(
+        dupParquetSchema, sources.EqualTo("CINT", 1000))
+    }
+  }
+
+  test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
+    withTempPath { dir =>
+      val count = 10
+      val tableName = "spark_25207"
+      val tableDir = dir.getAbsoluteFile + "/table"
+      withTable(tableName) {
+        withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+          spark.range(count).selectExpr("id as A", "id as B", "id as b")
+            .write.mode("overwrite").parquet(tableDir)
+        }
+        sql(
+          s"""
+             |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION '$tableDir'
+           """.stripMargin)
+
+        withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+          val e = intercept[SparkException] {
+            sql(s"select a from $tableName where b > 0").collect()
+          }
+          assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
+            """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
+        }
+
+        withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+          checkAnswer(sql(s"select A from $tableName where B > 0"), (1 until count).map(Row(_)))
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message