spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng Lian (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-13911) Having condition and order by cannot both have aggregate functions
Date Tue, 15 Mar 2016 18:16:33 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195821#comment-15195821
] 

Cheng Lian edited comment on SPARK-13911 at 3/15/16 6:15 PM:
-------------------------------------------------------------

Another problem related to the {{ResolveAggregateFunctions}} rule is that it invokes the analyzer
recursively, which is pretty tricky to understand and maintain.  Here is a possible fix that
both fixes the above issue and removes the recursive invocation.

Considering having condition and order by over aggregations are actually tightly coupled with
aggregation during resolution, we probably shouldn't view them as separate constructs while
resolving them.  One possible fix of this issue is to introduce a new unresolved logical plan
node {{UnresolvedAggregate}}:

{code}
case class UnresolvedAggregate(
  child: LogicalPlan,
  groupingExpressions: Seq[Expression],
  aggregateExpressions: Seq[NamedExpression],
  havingCondition: Option[Expression] = None,
  order: Seq[SortOrder] = Nil
) extends UnaryLogicalPlan
{code}

The major difference between {{UnresolvedAggregate}} and {{Aggregate}} is that it also contains
an optional having condition and a list of sort orders.  In other words, it's a filtered,
ordered aggregate operator.  Then, we can have two simple rules to merge all adjacent {{Sort}}
and {{Filter}} operators directly above an {{UnresolvedAggregate}}:

{code}
  object MergeHavingConditions extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Filter(condition, (agg: UnresolvedAggregate)) =>
        // Combines all having conditions
        val combinedCondition = (agg.havingCondition.toSeq :+ condition).reduce(And)
        agg.copy(havingCondition = Some(combinedCondition))
    }
  }

  object MergeSortsOverAggregates extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Sort(order, _, (agg: UnresolvedAggregate)) =>
        // Only preserves the last sort order
        agg.copy(order = order)
    }
  }
{code}

(Of course, we also need to make the Dataset API and the {{GlobalAggregates}} produce {{UnresolvedAggregate}}
instead of {{Aggregate}}.)

At last, we only need to resolve {{UnresolvedAggregate}} into {{Aggregate}} with optional
{{Filter}} and {{Sort}}, which is relatively straightforward.  Now, we no long need to invoke
the analyzer recursively in {{ResolveAggregateFunctions}} to resolve aggregate functions appearing
in having and order by clauses, since they are already merged into {{UnresolvedAggregate}}
and can be resolved all together with grouping expressions and aggregate expressions.

cc [~yhuai]


was (Author: lian cheng):
Another problem related to the {{ResolveAggregateFunctions}} rule is that it invokes the analyzer
recursively, which is pretty tricky to understand and maintain.  Here is a possible fix that
both fixes the above issue and removes the recursive invocation.

Considering having condition and order by over aggregations are actually tightly coupled with
aggregation during resolution, we probably shouldn't view them as separate constructs while
resolving them.  One possible fix of this issue is to introduce a new unresolved logical plan
node {{UnresolvedAggregate}}:

{code}
case class UnresolvedAggregate(
  child: LogicalPlan,
  groupingExpressions: Seq[Expression],
  aggregateExpressions: Seq[NamedExpression],
  havingCondition: Option[Expression] = None,
  order: Seq[SortOrder] = Nil
) extends UnaryLogicalPlan
{code}

The major difference between {{UnresolvedAggregate}} and {{Aggregate}} is that it also contains
an optional having condition and a list of sort orders.  In other words, it's a filtered,
ordered aggregate operator.  Then, we can have two simple rules to merge all adjacent {{Sort}}
and {{Filter}} operators directly above an {{UnresolvedAggregate}}:

{code}
  object MergeHavingConditions extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Filter(condition, (agg: UnresolvedAggregate)) =>
        // Combines all having conditions
        val combinedCondition = (agg.havingCondition.toSeq :+ condition).reduce(And)
        agg.copy(havingCondition = Some(combinedCondition))
    }
  }

  object MergeSortsOverAggregates extends Rule[LogicalPlan] {
    override def apply(tree: LogicalPlan): LogicalPlan = tree transformDown {
      case Sort(order, _, (agg: UnresolvedAggregate)) =>
        // Only preserves the last sort order
        agg.copy(order = order)
    }
  }
{code}

(Of course, we also need to make the Dataset API and the {{GlobalAggregates}} produce {{UnresolvedAggregate}}
instead of {{Aggregate}}.)

At last, we only need to resolve {{UnresolvedAggregate}} into {{Aggregate}} with optional
{{Filter}} and {{Sort}}, which is relatively straightforward.  Now, we no long need to invoke
the analyzer recursively in {{ResolveAggregateFunctions}} to resolve aggregate functions appearing
in having and order by clauses, since they are already merged into {{UnresolvedAggregate}}
and can be resolved all together with grouping expressions and aggregate expressions.

cc yhuai

> Having condition and order by cannot both have aggregate functions
> ------------------------------------------------------------------
>
>                 Key: SPARK-13911
>                 URL: https://issues.apache.org/jira/browse/SPARK-13911
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.1, 2.0.0
>            Reporter: Cheng Lian
>
> Given the following temporary table:
> {code}
> sqlContext range 10 select ('id as 'a, 'id as 'b) registerTempTable "t"
> {code}
> The following SQL statement can't pass analysis:
> {noformat}
> scala> sqlContext sql "SELECT * FROM t GROUP BY a HAVING COUNT(b) > 0 ORDER BY
COUNT(b)" show ()
> org.apache.spark.sql.AnalysisException: expression '`t`.`b`' is neither present in the
group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)
if you don't care which value you get.;
>   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:36)
>   at org.apache.spark.sql.Dataset$.newDataFrame(Dataset.scala:58)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:784)
>   ... 49 elided
> {noformat}
> The reason is that analysis rule {{ResolveAggregateFunctions}} only handles the first
{{Filter}} _or_ {{Sort}} directly above an {{Aggregate}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message