spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-11301] [SQL] fix case sensitivity for filter on partitioned columns
Date Thu, 29 Oct 2015 23:36:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4f5e60c64 -> 96cf87f66


[SPARK-11301] [SQL] fix case sensitivity for filter on partitioned columns

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9271 from cloud-fan/filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96cf87f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96cf87f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96cf87f6

Branch: refs/heads/master
Commit: 96cf87f66d47245b19e719cb83947042b21546fa
Parents: 4f5e60c
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Oct 29 16:36:52 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Oct 29 16:36:52 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSourceStrategy.scala  | 12 +++++-------
 .../scala/org/apache/spark/sql/DataFrameSuite.scala     | 10 ++++++++++
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96cf87f6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ffb4645..af6626c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -63,16 +63,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       // We divide the filter expressions into 3 parts
-      val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
+      val partitionColumns = AttributeSet(
+        t.partitionColumns.map(c => l.output.find(_.name == c.name).get))
 
-      // TODO this is case-sensitive
-      // Only prunning the partition keys
-      val partitionFilters =
-        filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames))
+      // Only pruning the partition keys
+      val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
 
       // Only pushes down predicates that do not reference partition keys.
-      val pushedFilters =
-        filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty)
+      val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
 
       // Predicates with both partition keys and attributes
       val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/96cf87f6/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 59565a6..c9d6e19 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -987,4 +987,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
     assert(df.select($"src.i".cast(StringType)).columns.head === "i")
   }
+
+  test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      withTempPath { path =>
+        Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
+        val df = sqlContext.read.parquet(path.getAbsolutePath)
+        checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message