spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dilipbiswal <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-13427][SQL] Support USING clause in JOI...
Date Thu, 17 Mar 2016 05:59:05 GMT
Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11297#discussion_r56459052
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
    @@ -1329,48 +1329,72 @@ class Analyzer(
       }
     
       /**
    -   * Removes natural joins by calculating output columns based on output from two sides,
    -   * Then apply a Project on a normal Join to eliminate natural join.
    +   * Removes natural or using joins by calculating output columns based on output from
two sides,
    +   * Then apply a Project on a normal Join to eliminate natural or using join.
        */
    -  object ResolveNaturalJoin extends Rule[LogicalPlan] {
    +  object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
         override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    +      case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
    +          if left.resolved && right.resolved && j.duplicateResolved =>
    +        // Resolve the column names referenced in using clause from both the legs of
join.
    +        val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver))
    +        val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver))
    +        if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length))
{
    +          val joinNames = lCols.map(exp => exp.name)
    +          commonNaturalJoinProcessing(left, right, joinType, joinNames, None)
    +        } else {
    +          j
    +        }
           case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural
=>
             // find common column names from both sides
             val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
    -        val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get)
    -        val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get)
    -        val joinPairs = leftKeys.zip(rightKeys)
    -
    -        // Add joinPairs to joinConditions
    -        val newCondition = (condition ++ joinPairs.map {
    -          case (l, r) => EqualTo(l, r)
    -        }).reduceOption(And)
    -
    -        // columns not in joinPairs
    -        val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att))
    -        val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))
    -
    -        // the output list looks like: join keys, columns from left, columns from right
    -        val projectList = joinType match {
    -          case LeftOuter =>
    -            leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
    -          case RightOuter =>
    -            rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
    -          case FullOuter =>
    -            // in full outer join, joinCols should be non-null if there is.
    -            val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l,
r)), l.name)() }
    -            joinedCols ++
    -              lUniqueOutput.map(_.withNullability(true)) ++
    -              rUniqueOutput.map(_.withNullability(true))
    -          case Inner =>
    -            rightKeys ++ lUniqueOutput ++ rUniqueOutput
    -          case _ =>
    -            sys.error("Unsupported natural join type " + joinType)
    -        }
    -        // use Project to trim unnecessary fields
    -        Project(projectList, Join(left, right, joinType, newCondition))
    +        commonNaturalJoinProcessing(left, right, joinType, joinNames, condition)
    +    }
    +  }
    +
    +  private def commonNaturalJoinProcessing(
    +     left: LogicalPlan,
    +     right: LogicalPlan,
    +     joinType: JoinType,
    +     joinNames: Seq[String],
    +     condition: Option[Expression]) = {
    +    val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get)
    +    val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get)
    +    val joinPairs = leftKeys.zip(rightKeys)
    +
    +    // Add joinPairs to joinConditions
    +    val newCondition = (condition ++ joinPairs.map {
    --- End diff --
    
    @hvanhovell Thank you for your review !! I have made the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message