spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail
Date Fri, 30 Oct 2015 10:22:30 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0df2c78a7 -> 06d3257c9


[SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail

When enabling mergedSchema and predicate filter, this fails since Parquet does not accept
filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).

For now, it just simply disables predicate push down when using merged schema in this PR.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9327 from HyukjinKwon/SPARK-11103.

(cherry picked from commit 59db9e9c382fab40aac0633f2c779bee8cf2025f)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06d3257c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06d3257c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06d3257c

Branch: refs/heads/branch-1.5
Commit: 06d3257c95daf278e9bdc8822d1fbbc40038e852
Parents: 0df2c78
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Fri Oct 30 18:17:35 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Fri Oct 30 18:21:52 2015 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetRelation.scala   |  6 +++-
 .../parquet/ParquetFilterSuite.scala            | 37 ++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06d3257c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index a73dccd..9e6c116 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -285,6 +285,10 @@ private[sql] class ParquetRelation(
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
     val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
 
+    // When merging schemas is enabled and the column of the given filter does not exist,
+    // Parquet emits an exception which is an issue of Parquet (PARQUET-389).
+    val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
+
     // Parquet row group size. We will use this value as the value for
     // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
     // of these flags are smaller than the parquet row group size.
@@ -298,7 +302,7 @@ private[sql] class ParquetRelation(
         dataSchema,
         parquetBlockSize,
         useMetadataCache,
-        parquetFilterPushDown,
+        safeParquetFilterPushDown,
         assumeBinaryIsString,
         assumeInt96IsTimestamp,
         followParquetFormatSpec) _

http://git-wip-us.apache.org/repos/asf/spark/blob/06d3257c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7f4d367..b2101be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -299,4 +299,41 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource
scan") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+
+        // If the "part = 1" filter gets pushed down, this query will throw an exception
since
+        // "part" is not a valid column in the actual Parquet file
+        checkAnswer(
+          sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
+          (2 to 3).map(i => Row(i, i.toString, 1)))
+      }
+    }
+  }
+
+  test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        var pathOne = s"${dir.getCanonicalPath}/table1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+        var pathTwo = s"${dir.getCanonicalPath}/table2"
+        (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+        // If the "c = 1" filter gets pushed down, this query will throw an exception which
+        // Parquet emits. This is a Parquet issue (PARQUET-389).
+        checkAnswer(
+          sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
+          (1 to 1).map(i => Row(i, i.toString, null)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message