carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipesala <...@git.apache.org>
Subject [GitHub] carbondata pull request #1544: [CARBONDATA-1740][Pre-Aggregate] Fixed order ...
Date Mon, 04 Dec 2017 14:56:30 GMT
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1544#discussion_r154670559
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
    @@ -328,9 +462,145 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
             Aggregate(updatedGroupExp,
               updatedAggExp,
               newChild)
    +        // case for aggregation query with order by
    +      case Project(_, Sort(sortOrders, global, Aggregate(groupingExp,
    +      aggregateExp,
    +      subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(groupingExp,
    +            aggregateExp,
    +            subQuery,
    +            None,
    +            aggDataMapSchema,
    +            attributes,
    +            childPlan)
    +        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
    +          sortOrders,
    +          aggDataMapSchema,
    +          attributes)
    +        Project(updatedProjectList,
    +          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)))
    +       // case for handling aggregation query with filter and order by
    +      case Project(_, Sort(sortOrders, global, Aggregate(groupingExp,
    +      aggregateExp,
    +      Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +          getUpdatedExpressions(groupingExp,
    +            aggregateExp,
    +            subQuery,
    +            Some(expression),
    +            aggDataMapSchema,
    +            attributes,
    +            childPlan)
    +        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
    +          sortOrders,
    +          aggDataMapSchema,
    +          attributes)
    +        Project(updatedProjectList,
    +          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp,
    +            Filter(updatedFilterExpression.get, newChild))))
    +      // case for handling aggregation with order by when only projection column exits
    +      case Sort(sortOrders, global, Aggregate(groupingExp,
    +      aggregateExp,
    +      subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +             .hasDataMapSchema =>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(groupingExp,
    +            aggregateExp,
    +            subQuery,
    +            None,
    +            aggDataMapSchema,
    +            attributes,
    +            childPlan)
    +        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
    +          sortOrders,
    +          aggDataMapSchema,
    +          attributes)
    +        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
    +      // case for handling aggregation with order by and filter when only projection
column exits
    +      case Sort(sortOrders, global, Aggregate(groupingExp,
    +      aggregateExp,
    +      Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +          getUpdatedExpressions(groupingExp,
    +            aggregateExp,
    +            subQuery,
    +            Some(expression),
    +            aggDataMapSchema,
    +            attributes,
    +            childPlan)
    +        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
    +          sortOrders,
    +          aggDataMapSchema,
    +          attributes)
    +        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
         }
       }
     
    +  /**
    +   * Below method will be used to updated the maintable plan for order by query
    +   * In case of order by we need to update project list and sort order attributes.
    +   *
    +   * @param aggregateExp
    +   *                     child table aggregate expression
    +   * @param sortOrders
    +   *                   sort order expression in maintable plan
    +   * @param aggDataMapSchema
    +   *                         child data map schema
    +   * @param attributes
    +   *                   child attributes
    +   * @return updated project list and updated sort order
    +   */
    +  def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression],
    +      sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema,
    +      attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) =
{
    +    val updatedProjectList = new ArrayBuffer[NamedExpression]()
    +    // getting the updated project list from aggregate expression
    +    aggregateExp.foreach{f => f.transform {
    +      // for projection column
    +      case alias@Alias(attr: AttributeReference, name) =>
    +        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
    +          alias.qualifier,
    +          alias.isGenerated)
    +        alias
    +        // for aggregaton column
    +      case alias@Alias(attr: AggregateExpression, name) =>
    +        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
    +          alias.qualifier,
    +          alias.isGenerated)
    +        alias
    +    }
    +    }
    +    val updatedSortOrders = new ArrayBuffer[SortOrder]()
    +    // getting the updated sort order
    +    sortOrders.map {
    --- End diff --
    
    take the output of map, no need of adding to `updatedSortOrders`


---

Mime
View raw message