Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 302B3200BB6 for ; Fri, 21 Oct 2016 06:28:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2EBE3160AF2; Fri, 21 Oct 2016 04:28:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4FEB7160AE0 for ; Fri, 21 Oct 2016 06:28:07 +0200 (CEST) Received: (qmail 78504 invoked by uid 500); 21 Oct 2016 04:28:06 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 78494 invoked by uid 99); 21 Oct 2016 04:28:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2016 04:28:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 46AF5DFDB1; Fri, 21 Oct 2016 04:28:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation Date: Fri, 21 Oct 2016 04:28:06 +0000 (UTC) archived-at: Fri, 21 Oct 2016 04:28:08 -0000 Repository: spark Updated Branches: refs/heads/master 3180272d2 -> 57e97fcbd [SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation ## What changes were proposed in this pull request? In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it. ## How was this patch tested? the new `PruneFileSourcePartitionsSuite` Author: Wenchen Fan Closes #15569 from cloud-fan/partition-bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57e97fcb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57e97fcb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57e97fcb Branch: refs/heads/master Commit: 57e97fcbd6fe62af4acd60896feeacfa21efc222 Parents: 3180272 Author: Wenchen Fan Authored: Fri Oct 21 12:27:53 2016 +0800 Committer: Wenchen Fan Committed: Fri Oct 21 12:27:53 2016 +0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 4 +- .../datasources/PruneFileSourcePartitions.scala | 4 +- .../spark/sql/hive/HiveDataFrameSuite.scala | 7 +- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 3 +- .../PruneFileSourcePartitionsSuite.scala | 74 ++++++++++++++++++++ 5 files changed, 85 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 1a57a77..a97ed70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -102,8 +102,8 @@ case class CatalogTablePartition( * Given the partition schema, returns a row with that schema holding the partition values. */ def toRow(partitionSchema: StructType): InternalRow = { - InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => - Cast(Literal(spec(name)), dataType).eval() + InternalRow.fromSeq(partitionSchema.map { field => + Cast(Literal(spec(field.name)), field.dataType).eval() }) } } http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 29121a4..8689017 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileCatalog)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + val prunedLogicalRelation = logicalRelation.copy( + relation = prunedFsRelation, + expectedOutputAttributes = Some(logicalRelation.output)) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index f65e74d..1552343 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.QueryTest class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("table name with schema") { @@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("lazy partition pruning reads only necessary partition data") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) @@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("all partitions read and cached when filesource partition pruning is off") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 2ca1cd4..d290fe9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils /** @@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi def testCaching(pruningEnabled: Boolean): Unit = { test(s"partitioned table is cached when partition pruning is $pruningEnabled") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) { withTable("test") { withTempDir { dir => spark.range(5).selectExpr("id", "id as f1", "id as f2").write http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala new file mode 100644 index 0000000..346ea0c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType + +class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil + } + + test("PruneFileSourcePartitions should not change the output of LogicalRelation") { + withTable("test") { + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet + |LOCATION '${dir.getAbsolutePath}'""".stripMargin) + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val tableFileCatalog = new TableFileCatalog( + spark, + tableMeta.database, + tableMeta.identifier.table, + Some(tableMeta.partitionSchema), + 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = tableFileCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze + + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org