spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation
Date Fri, 21 Oct 2016 04:28:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3180272d2 -> 57e97fcbd


[SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation

## What changes were proposed in this pull request?

In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However,
this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`.
This PR fixes it.

## How was this patch tested?

the new `PruneFileSourcePartitionsSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15569 from cloud-fan/partition-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57e97fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57e97fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57e97fcb

Branch: refs/heads/master
Commit: 57e97fcbd6fe62af4acd60896feeacfa21efc222
Parents: 3180272
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Fri Oct 21 12:27:53 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Oct 21 12:27:53 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  4 +-
 .../datasources/PruneFileSourcePartitions.scala |  4 +-
 .../spark/sql/hive/HiveDataFrameSuite.scala     |  7 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala |  3 +-
 .../PruneFileSourcePartitionsSuite.scala        | 74 ++++++++++++++++++++
 5 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 1a57a77..a97ed70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -102,8 +102,8 @@ case class CatalogTablePartition(
    * Given the partition schema, returns a row with that schema holding the partition values.
    */
   def toRow(partitionSchema: StructType): InternalRow = {
-    InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
-      Cast(Literal(spec(name)), dataType).eval()
+    InternalRow.fromSeq(partitionSchema.map { field =>
+      Cast(Literal(spec(field.name)), field.dataType).eval()
     })
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 29121a4..8689017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan]
{
         val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
           fsRelation.copy(location = prunedFileCatalog)(sparkSession)
-        val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
+        val prunedLogicalRelation = logicalRelation.copy(
+          relation = prunedFsRelation,
+          expectedOutputAttributes = Some(logicalRelation.output))
 
         // Keep partition-pruning predicates so that they are visible in physical planning
         val filterExpression = filters.reduceLeft(And)

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
index f65e74d..1552343 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.QueryTest
 
 class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   test("table name with schema") {
@@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
   }
 
   test("lazy partition pruning reads only necessary partition data") {
-    withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
+    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
       withTable("test") {
         withTempDir { dir =>
           setupPartitionedTable("test", dir)
@@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with
SQLTestUt
   }
 
   test("all partitions read and cached when filesource partition pruning is off") {
-    withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
+    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
       withTable("test") {
         withTempDir { dir =>
           setupPartitionedTable("test", dir)

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 2ca1cd4..d290fe9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
 /**
@@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
 
   def testCaching(pruningEnabled: Boolean): Unit = {
     test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
-      withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString)
{
+      withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString)
{
         withTable("test") {
           withTempDir { dir =>
             spark.range(5).selectExpr("id", "id as f1", "id as f2").write

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
new file mode 100644
index 0000000..346ea0c
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions,
TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
+
+class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) ::
Nil
+  }
+
+  test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
+    withTable("test") {
+      withTempDir { dir =>
+        sql(
+          s"""
+            |CREATE EXTERNAL TABLE test(i int)
+            |PARTITIONED BY (p int)
+            |STORED AS parquet
+            |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
+
+        val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+        val tableFileCatalog = new TableFileCatalog(
+          spark,
+          tableMeta.database,
+          tableMeta.identifier.table,
+          Some(tableMeta.partitionSchema),
+          0)
+
+        val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+          tableMeta.partitionColumnNames.contains(f.name)
+        })
+        val relation = HadoopFsRelation(
+          location = tableFileCatalog,
+          partitionSchema = tableMeta.partitionSchema,
+          dataSchema = dataSchema,
+          bucketSpec = None,
+          fileFormat = new ParquetFileFormat(),
+          options = Map.empty)(sparkSession = spark)
+
+        val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+        val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
+
+        val optimized = Optimize.execute(query)
+        assert(optimized.missingInput.isEmpty)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message