Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 16C86186E5 for ; Thu, 18 Feb 2016 21:08:10 +0000 (UTC) Received: (qmail 65875 invoked by uid 500); 18 Feb 2016 21:07:57 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 65841 invoked by uid 500); 18 Feb 2016 21:07:57 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 65832 invoked by uid 99); 18 Feb 2016 21:07:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 21:07:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29360DFF67; Thu, 18 Feb 2016 21:07:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joshrosen@apache.org To: commits@spark.apache.org Message-Id: <657f76c2e6d4476192a6d980c2e1edf0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-13351][SQL] fix column pruning on Expand Date: Thu, 18 Feb 2016 21:07:57 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 78562535f -> 26f38bb83 [SPARK-13351][SQL] fix column pruning on Expand Currently, the columns in projects of Expand that are not used by Aggregate are not pruned, this PR fix that. Author: Davies Liu Closes #11225 from davies/fix_pruning_expand. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f38bb8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f38bb8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f38bb8 Branch: refs/heads/master Commit: 26f38bb83c423e512955ca25775914dae7e5bbf0 Parents: 7856253 Author: Davies Liu Authored: Thu Feb 18 13:07:41 2016 -0800 Committer: Josh Rosen Committed: Thu Feb 18 13:07:41 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 10 ++++++ .../catalyst/optimizer/ColumnPruningSuite.scala | 33 ++++++++++++++++++-- 2 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/26f38bb8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 567010f..55c168d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -300,6 +300,16 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case a @ Aggregate(_, _, e @ Expand(projects, output, child)) + if (e.outputSet -- a.references).nonEmpty => + val newOutput = output.filter(a.references.contains(_)) + val newProjects = projects.map { proj => + proj.zip(output).filter { case (e, a) => + newOutput.contains(a) + }.unzip._1 + } + a.copy(child = Expand(newProjects, newOutput, child)) + case a @ Aggregate(_, _, e @ Expand(_, _, child)) if (child.outputSet -- e.references -- a.references).nonEmpty => a.copy(child = e.copy(child = prunedChild(child, e.references ++ a.references))) http://git-wip-us.apache.org/repos/asf/spark/blob/26f38bb8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 81f3928..c890fff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Explode +import org.apache.spark.sql.catalyst.expressions.{Explode, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.StringType @@ -96,5 +96,34 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Column pruning for Expand") { + val input = LocalRelation('a.int, 'b.string, 'c.double) + val query = + Aggregate( + Seq('aa, 'gid), + Seq(sum('c).as("sum")), + Expand( + Seq( + Seq('a, 'b, 'c, Literal.create(null, StringType), 1), + Seq('a, 'b, 'c, 'a, 2)), + Seq('a, 'b, 'c, 'aa.int, 'gid.int), + input)).analyze + val optimized = Optimize.execute(query) + + val expected = + Aggregate( + Seq('aa, 'gid), + Seq(sum('c).as("sum")), + Expand( + Seq( + Seq('c, Literal.create(null, StringType), 1), + Seq('c, 'a, 2)), + Seq('c, 'aa.int, 'gid.int), + Project(Seq('c, 'a), + input))).analyze + + comparePlans(optimized, expected) + } + // todo: add more tests for column pruning } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org