spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gatorsmile <...@git.apache.org>
Subject [GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Date Thu, 09 Mar 2017 01:31:44 GMT
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15363#discussion_r105068816
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
---
    @@ -20,19 +20,340 @@ package org.apache.spark.sql.catalyst.optimizer
     import scala.annotation.tailrec
     
     import org.apache.spark.sql.catalyst.expressions._
    -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
    +import org.apache.spark.sql.catalyst.planning.{BaseTableAccess, ExtractFiltersAndInnerJoins}
     import org.apache.spark.sql.catalyst.plans._
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.rules._
    +import org.apache.spark.sql.catalyst.CatalystConf
    +
    +/**
    + * Encapsulates star-schema join detection.
    + */
    +case class DetectStarSchemaJoin(conf: CatalystConf) extends PredicateHelper {
    +
    +  /**
    +   * Star schema consists of one or more fact tables referencing a number of dimension
    +   * tables. In general, star-schema joins are detected using the following conditions:
    +   *  1. Informational RI constraints (reliable detection)
    +   *    + Dimension contains a primary key that is being joined to the fact table.
    +   *    + Fact table contains foreign keys referencing multiple dimension tables.
    +   *  2. Cardinality based heuristics
    +   *    + Usually, the table with the highest cardinality is the fact table.
    +   *    + Table being joined with the most number of tables is the fact table.
    +   *
    +   * To detect star joins, the algorithm uses a combination of the above two conditions.
    +   * The fact table is chosen based on the cardinality heuristics, and the dimension
    +   * tables are chosen based on the RI constraints. A star join will consist of the largest
    +   * fact table joined with the dimension tables on their primary keys. To detect that
a
    +   * column is a primary key, the algorithm uses table and column statistics.
    +   *
    +   * Since Catalyst only supports left-deep tree plans, the algorithm currently returns
only
    +   * the star join with the largest fact table. Choosing the largest fact table on the
    +   * driving arm to avoid large inners is in general a good heuristic. This restriction
can
    +   * be lifted with support for bushy tree plans.
    +   *
    +   * The highlights of the algorithm are the following:
    +   *
    +   * Given a set of joined tables/plans, the algorithm first verifies if they are eligible
    +   * for star join detection. An eligible plan is a base table access with valid statistics.
    +   * A base table access represents Project or Filter operators above a LeafNode. Conservatively,
    +   * the algorithm only considers base table access as part of a star join since they
provide
    +   * reliable statistics.
    +   *
    +   * If some of the plans are not base table access, or statistics are not available,
the algorithm
    +   * falls back to the positional join reordering, since in the absence of statistics
it cannot make
    +   * good planning decisions. Otherwise, the algorithm finds the table with the largest
cardinality
    +   * (number of rows), which is assumed to be a fact table.
    +   *
    +   * Next, it computes the set of dimension tables for the current fact table. A dimension
table
    +   * is assumed to be in a RI relationship with a fact table. To infer column uniqueness,
    +   * the algorithm compares the number of distinct values with the total number of rows
in the
    +   * table. If their relative difference is within certain limits (i.e. ndvMaxError *
2, adjusted
    +   * based on tpcds data), the column is assumed to be unique.
    +   *
    +   * Given a star join, i.e. fact and dimension tables, the algorithm considers three
cases:
    +   *
    +   * 1) The star join is an expanding join i.e. the fact table is joined using inequality
    +   * predicates or Cartesian product. In this case, the algorithm conservatively falls
back
    +   * to the default join reordering since it cannot make good planning decisions in the
absence
    +   * of the cost model.
    +   *
    +   * 2) The star join is a selective join. This case is detected by observing local predicates
    +   * on the dimension tables. In a star schema relationship, the join between the fact
and the
    +   * dimension table is a FK-PK join. Heuristically, a selective dimension may reduce
    +   * the result of a join.
    +   *
    +   * 3) The star join is not a selective join (i.e. doesn't reduce the number of rows).
In this
    +   * case, the algorithm conservatively falls back to the default join reordering.
    +   *
    +   * If an eligible star join was found in step 2 above, the algorithm reorders the tables
based
    +   * on the following heuristics:
    +   * 1) Place the largest fact table on the driving arm to avoid large tables on the
inner of a
    +   *    join and thus favor hash joins.
    +   * 2) Apply the most selective dimensions early in the plan to reduce data flow.
    +   *
    +   * Other assumptions made by the algorithm, mainly to prevent regressions in the absence
of a
    +   * cost model, are the following:
    +   * 1) Only considers star joins with more than one dimensions, which is a typical
    +   *    star join scenario.
    +   * 2) If the top largest tables have comparable number of rows, fall back to the default
    +   *    join reordering. This will prevent changing the position of the large tables
in the join.
    +   */
    +  def findStarJoinPlan(
    +      input: Seq[(LogicalPlan, InnerLike)],
    +      conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = {
    +    assert(input.size >= 2)
    +
    +    val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)]
    +
    +    // Find if the input plans are eligible for star join detection.
    +    // An eligible plan is a base table access with valid statistics.
    +    val foundEligibleJoin = input.forall { case (plan, _) =>
    +      plan match {
    +        case BaseTableAccess(t, _) if t.stats(conf).rowCount.isDefined => true
    +        case _ => false
    +      }
    +    }
    +
    +    if (!foundEligibleJoin) {
    +      // Some plans don't have stats or are complex plans. Conservatively fall back
    +      // to the default join reordering by returning an empty star join.
    +      // This restriction can be lifted once statistics are propagated in the plan.
    +      emptyStarJoinPlan
    +
    +    } else {
    +      // Find the fact table using cardinality based heuristics i.e.
    +      // the table with the largest number of rows.
    +      val sortedFactTables = input.map { case p @ (plan, _) =>
    +        TableCardinality(p, getBaseTableAccessCardinality(plan))
    +      }.collect { case t @ TableCardinality(_, Some(_)) =>
    +        t
    +      }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse)
    +
    +      sortedFactTables match {
    +        case Nil =>
    +          emptyStarJoinPlan
    +        case table1 :: table2 :: _
    +            if table2.size.get.toDouble > conf.starJoinFactTableRatio * table1.size.get.toDouble
=>
    +          // The largest tables have comparable number of rows.
    +          emptyStarJoinPlan
    +        case TableCardinality(factPlan @ (factTable, _), _) :: _ =>
    +          // Find the fact table joins.
    +          val allFactJoins = input.filterNot { case (plan, _) =>
    +            plan eq factTable
    +          }.filter { case (plan, _) =>
    +            val joinCond = findJoinConditions(factTable, plan, conditions)
    +            joinCond.nonEmpty
    +          }
    +
    +          // Find the corresponding join conditions.
    +          val allFactJoinCond = allFactJoins.flatMap { case (plan, _) =>
    +            val joinCond = findJoinConditions(factTable, plan, conditions)
    +            joinCond
    +          }
    +
    +          // Verify if the join columns have valid statistics
    +          val areStatsAvailable = allFactJoins.forall { case (dimTable, _) =>
    +            allFactJoinCond.exists {
    +              case BinaryComparison(lhs: AttributeReference, rhs: AttributeReference)
=>
    +                val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs
    +                val factCol = if (factTable.outputSet.contains(lhs)) lhs else rhs
    +                hasStatistics(dimCol, dimTable) && hasStatistics(factCol, factTable)
    +             case _ => false
    +            }
    +          }
    +
    +          if (!areStatsAvailable) {
    +            emptyStarJoinPlan
    +          } else {
    +            // Find the subset of dimension tables. A dimension table is assumed to be
in
    +            // RI relationship with the fact table. Also, conservatively, only consider
    +            // equi-join between a fact and a dimension table.
    +            val eligibleDimPlans = allFactJoins.filter { case (dimTable, _) =>
    +              allFactJoinCond.exists {
    +                case cond @ BinaryComparison(lhs: AttributeReference, rhs: AttributeReference)
    +                    if cond.isInstanceOf[EqualTo] || cond.isInstanceOf[EqualNullSafe]
=>
    +                  val dimCol = if (dimTable.outputSet.contains(lhs)) lhs else rhs
    +                  isUnique(dimCol, dimTable)
    +                case _ => false
    +              }
    +            }
    +
    +            if (eligibleDimPlans.isEmpty) {
    +              // An eligible star join was not found because the join is not
    +              // an RI join, or the star join is an expanding join.
    +              // Conservatively fall back to the default join order.
    +              emptyStarJoinPlan
    +            } else if (eligibleDimPlans.size < 2) {
    +              // Conservatively assume that a fact table is joined with more than one
dimension.
    +              emptyStarJoinPlan
    +            } else if (isSelectiveStarJoin(eligibleDimPlans.map{case (p, _) => p},
conditions)) {
    +              // This is a selective star join. Reorder the dimensions in based on their
    +              // cardinality and return the star-join plan.
    +              val sortedDims = eligibleDimPlans.map { case p @ (plan, _) =>
    +                TableCardinality(p, getBaseTableAccessCardinality(plan))
    +              }.sortBy(_.size).map {
    +                case TableCardinality(p1, _) => p1
    +              }
    +              factPlan +: sortedDims
    +            } else {
    +              // This is a non selective star join. Conservatively fall back to the default
    +              // join order.
    +              emptyStarJoinPlan
    +            }
    +          }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Determines if a column referenced by a base table access is a primary key.
    +   * A column is a PK if it is not nullable and has unique values.
    +   * To determine if a column has unique values in the absence of informational
    +   * RI constraints, the number of distinct values is compared to the total
    +   * number of rows in the table. If their relative difference
    +   * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based
    +   * on TPCDS data results), the column is assumed to have unique values.
    +   */
    +  private def isUnique(
    +      column: Attribute,
    +      plan: LogicalPlan): Boolean = plan match {
    +    case BaseTableAccess(t, _) =>
    +      val leafCol = findLeafNodeCol(column, plan)
    +      leafCol match {
    +        case Some(col) if t.outputSet.contains(col) =>
    +          val stats = t.stats(conf)
    +          stats.rowCount match {
    +            case Some(rowCount) if rowCount >= 0 =>
    +              if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col))
{
    +                val colStats = stats.attributeStats.get(col)
    +                if (colStats.get.nullCount > 0) {
    +                  false
    +                } else {
    +                  val distinctCount = colStats.get.distinctCount
    +                  val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble)
- 1.0d)
    +                  // ndvMaxErr adjusted based on TPCDS 1TB data results
    +                  relDiff <= conf.ndvMaxError * 2
    +                }
    +              } else {
    +                false
    +              }
    +            case None => false
    +          }
    +        case None => false
    +      }
    +    case _ => false
    +  }
    +
    +  /**
    +   * Given a column over a base table access, it returns
    +   * the leaf node column from which the input column is derived.
    +   */
    +  @tailrec
    +  private def findLeafNodeCol(
    +      column: Attribute,
    +      plan: LogicalPlan): Option[Attribute] = plan match {
    +    case pl @ BaseTableAccess(_, _) =>
    +      pl match {
    +        case t: LeafNode if t.outputSet.contains(column) =>
    +          Option(column)
    +        case p: Project if p.outputSet.exists(_.semanticEquals(column)) =>
    +          val col = p.outputSet.find(_.semanticEquals(column)).get
    +          findLeafNodeCol(col, p.child)
    +        case f: Filter =>
    +          findLeafNodeCol(column, f.child)
    +        case _ => None
    +      }
    +    case _ => None
    +  }
    +
    +  /**
    +   * Checks if a column has statistics.
    +   * The column is assumed to be over a base table access.
    +   */
    +  private def hasStatistics(
    +      column: Attribute,
    +      plan: LogicalPlan): Boolean = plan match {
    +    case BaseTableAccess(t, _) =>
    +      val leafCol = findLeafNodeCol(column, plan)
    +      leafCol match {
    +        case Some(col) if t.outputSet.contains(col) =>
    +          val stats = t.stats(conf)
    +          stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)
    +        case None => false
    +      }
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns the join predicates between two input plans. It only
    +   * considers basic comparison operators.
    +   */
    +  @inline
    +  private def findJoinConditions(
    +      plan1: LogicalPlan,
    +      plan2: LogicalPlan,
    +      conditions: Seq[Expression]): Seq[Expression] = {
    +    val refs = plan1.outputSet ++ plan2.outputSet
    +    conditions.filter {
    +      case BinaryComparison(_, _) => true
    +      case _ => false
    +    }.filterNot(canEvaluate(_, plan1))
    +     .filterNot(canEvaluate(_, plan2))
    +     .filter(_.references.subsetOf(refs))
    +  }
    +
    +  /**
    +   * Checks if a star join is a selective join. A star join is assumed
    +   * to be selective if there are local predicates on the dimension
    +   * tables.
    +   */
    +  private def isSelectiveStarJoin(
    +      dimTables: Seq[LogicalPlan],
    +      conditions: Seq[Expression]): Boolean = dimTables.exists {
    +    case plan @ BaseTableAccess(_, p) =>
    +      // Checks if any condition applies to the dimension tables.
    +      // Exclude the IsNotNull predicates until predicate selectivity is available.
    +      // In most cases, this predicate is artificially introduced by the Optimizer
    +      // to enforce nullability constraints.
    +      val localPredicates = conditions.filterNot(_.isInstanceOf[IsNotNull])
    +        .exists(canEvaluate(_, plan))
    +
    +      // Checks if there are any predicates pushed down to the base table access.
    +      val pushedDownPredicates = p.nonEmpty && !p.forall(_.isInstanceOf[IsNotNull])
    +
    +      localPredicates || pushedDownPredicates
    +    case _ => false
    +  }
    +
    +  /**
    +   * Helper case class to hold (plan, rowCount) pairs.
    +   */
    +  private case class TableCardinality(plan: (LogicalPlan, InnerLike), size: Option[BigInt])
    +
    +  /**
    +   * Returns the cardinality of a base table access. A base table access represents
    +   * a LeafNode, or Project or Filter operators above a LeafNode.
    +   */
    +  private def getBaseTableAccessCardinality(
    +      input: LogicalPlan): Option[BigInt] = input match {
    +    case BaseTableAccess(t, cond) if t.stats(conf).rowCount.isDefined =>
    +      if (conf.cboEnabled && input.stats(conf).rowCount.isDefined) {
    +        Option(input.stats(conf).rowCount.get)
    +      } else {
    +        Option(t.stats(conf).rowCount.get)
    +      }
    +    case _ => None
    +  }
    +}
     
     /**
      * Reorder the joins and push all the conditions into join, so that the bottom ones have
at least
      * one condition.
      *
      * The order of joins will not be changed if all of them already have at least one condition.
    --- End diff --
    
    We also need to update the comment about this rule.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message