Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DE7E3200C5A for ; Tue, 18 Apr 2017 14:12:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DD08A160BA1; Tue, 18 Apr 2017 12:12:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08F4F160B90 for ; Tue, 18 Apr 2017 14:12:31 +0200 (CEST) Received: (qmail 91653 invoked by uid 500); 18 Apr 2017 12:12:31 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 91644 invoked by uid 99); 18 Apr 2017 12:12:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 12:12:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20530DFB92; Tue, 18 Apr 2017 12:12:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-20366][SQL] Fix recursive join reordering: inside joins are not reordered Date: Tue, 18 Apr 2017 12:12:31 +0000 (UTC) archived-at: Tue, 18 Apr 2017 12:12:33 -0000 Repository: spark Updated Branches: refs/heads/master d4f10cbbe -> 321b4f03b [SPARK-20366][SQL] Fix recursive join reordering: inside joins are not reordered ## What changes were proposed in this pull request? If a plan has multi-level successive joins, e.g.: ``` Join / \ Union t5 / \ Join t4 / \ Join t3 / \ t1 t2 ``` Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not a child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. In this patch, we change `OrderedJoin` to a class having the same structure as a join node. ## How was this patch tested? Add a corresponding test case. Author: wangzhenhua Closes #17668 from wzhfy/recursiveReorder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/321b4f03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/321b4f03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/321b4f03 Branch: refs/heads/master Commit: 321b4f03bc983c582a3c6259019c077cdfac9d26 Parents: d4f10cb Author: wangzhenhua Authored: Tue Apr 18 20:12:21 2017 +0800 Committer: Wenchen Fan Committed: Tue Apr 18 20:12:21 2017 +0800 ---------------------------------------------------------------------- .../optimizer/CostBasedJoinReorder.scala | 22 +++++---- .../catalyst/optimizer/JoinReorderSuite.scala | 49 ++++++++++++++++++-- 2 files changed, 58 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/321b4f03/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index c704c2e..51eca6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} -import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} +import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, JoinType} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -47,7 +47,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr } // After reordering is finished, convert OrderedJoin back to Join result transformDown { - case oj: OrderedJoin => oj.join + case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond) } } } @@ -87,22 +87,24 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr } private def replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan = plan match { - case j @ Join(left, right, _: InnerLike, Some(cond)) => + case j @ Join(left, right, jt: InnerLike, Some(cond)) => val replacedLeft = replaceWithOrderedJoin(left) val replacedRight = replaceWithOrderedJoin(right) - OrderedJoin(j.copy(left = replacedLeft, right = replacedRight)) + OrderedJoin(replacedLeft, replacedRight, jt, Some(cond)) case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond))) => p.copy(child = replaceWithOrderedJoin(j)) case _ => plan } +} - /** This is a wrapper class for a join node that has been ordered. */ - private case class OrderedJoin(join: Join) extends BinaryNode { - override def left: LogicalPlan = join.left - override def right: LogicalPlan = join.right - override def output: Seq[Attribute] = join.output - } +/** This is a mimic class for a join node that has been ordered. */ +case class OrderedJoin( + left: LogicalPlan, + right: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]) extends BinaryNode { + override def output: Seq[Attribute] = left.output ++ right.output } /** http://git-wip-us.apache.org/repos/asf/spark/blob/321b4f03/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index 1922eb3..71db4e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -25,13 +25,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, CBO_ENABLED, JOIN_REORDER_ENABLED} +import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, JOIN_REORDER_ENABLED} class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy( - CASE_SENSITIVE -> true, CBO_ENABLED -> true, JOIN_REORDER_ENABLED -> true) + override val conf = new SQLConf().copy(CBO_ENABLED -> true, JOIN_REORDER_ENABLED -> true) object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -212,6 +211,50 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { } } + test("reorder recursively") { + // Original order: + // Join + // / \ + // Union t5 + // / \ + // Join t4 + // / \ + // Join t3 + // / \ + // t1 t2 + val bottomJoins = + t1.join(t2).join(t3).where((nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")) && + (nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100"))) + .select(nameToAttr("t1.v-1-10")) + + val originalPlan = bottomJoins + .union(t4.select(nameToAttr("t4.v-1-10"))) + .join(t5, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t5.v-1-5"))) + + // Should be able to reorder the bottom part. + // Best order: + // Join + // / \ + // Union t5 + // / \ + // Join t4 + // / \ + // Join t2 + // / \ + // t1 t3 + val bestBottomPlan = + t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100"))) + .select(nameToAttr("t1.k-1-2"), nameToAttr("t1.v-1-10")) + .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5"))) + .select(nameToAttr("t1.v-1-10")) + + val bestPlan = bestBottomPlan + .union(t4.select(nameToAttr("t4.v-1-10"))) + .join(t5, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t5.v-1-5"))) + + assertEqualPlans(originalPlan, bestPlan) + } + private def assertEqualPlans( originalPlan: LogicalPlan, groundTruthBestPlan: LogicalPlan): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org