carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipesala <...@git.apache.org>
Subject [GitHub] carbondata pull request #1464: [WIP][CARBONDATA-1523]Pre Aggregate table sel...
Date Mon, 06 Nov 2017 07:07:22 GMT
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149005119
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala
---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from aggregate table
with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with count)).
    + * Note: During aggregate table creation for average table will be created with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan]
{
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present is in plan
    +      // then call is from create preaggregate table class so no need to transform the
query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for plan to be
resolved
    +      // return the same plan as we can tranform the plan only when everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be
transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be
transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +            // getting the columns from filter expression
    +            filterExp.transform {
    +              case attr: AttributeReference =>
    +                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn
= true)
    +                attr
    +            }
    +          }
    +          carbonTable
    +
    +        // When plan has grouping expression, aggregate expression
    +        // logical relation
    +        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    +          // only carbon query plan is supported checking whether logical relation is
    +          // is for carbon
    +          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only plan will be
transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +        case _ =>
    +          isValidPlan = false
    +          null
    +      }
    +      // if plan is valid then update the plan with child attributes
    +      if (isValidPlan) {
    +        // getting all the projection columns
    +        val listProjectionColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
    +        // getting all the filter columns
    +        val listFilterColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
    +        // getting all the aggregation columns
    +        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    +        // create a query plan object which will be used to select the list of pre aggregate
tables
    +        // matches with this plan
    +        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    +          listAggregationColumn.asJava,
    +          listFilterColumn.asJava)
    +        // create aggregate table selector object
    +        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    +        // select the list of valid child tables
    +        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +        // if it doesnot match with any pre aggregate table return the same plan
    +        if (!selectedDataMapSchemas.isEmpty) {
    +          // sort the selected child schema based on size to select smallest pre aggregate
table
    +          val (aggDataMapSchema, carbonRelation) =
    +            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    +              val catalog = sparkSession.sessionState.catalog
    +              val carbonRelation = catalog
    +                .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
    +                  .getTableName,
    +                  Some(selectedDataMapSchema.getRelationIdentifier
    +                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
    +                .asInstanceOf[LogicalRelation]
    +              (selectedDataMapSchema, carbonRelation)
    +            }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
    +              .head
    +          // transform the query plan based on selected child schema
    +          transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
    +        } else {
    +          plan
    +        }
    +      } else {
    +        plan
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to get the child attribute reference
    +   * based on parent name
    +   *
    +   * @param dataMapSchema
    +   * child schema
    +   * @param attributeReference
    +   * parent attribute reference
    +   * @param childCarbonRelation
    +   * child logical relation
    +   * @param aggFunction
    +   * aggregation function applied on child
    +   * @return child attribute reference
    +   */
    +  def getChildAttributeReference(dataMapSchema: DataMapSchema,
    +      attributeReference: AttributeReference,
    +      childCarbonRelation: LogicalRelation,
    +      aggFunction: String = ""): AttributeReference = {
    +    val columnSchema = if (aggFunction.isEmpty) {
    +      dataMapSchema.getChildColumnByParentName(attributeReference.name)
    +    } else {
    +      dataMapSchema.getChildColByParentWithAggFun(attributeReference.name, aggFunction)
    +    }
    +    // here column schema cannot be null, if it is null then aggregate table selection
    +    // logic has some problem
    +    if (null == columnSchema) {
    +      throw new AnalysisException("Column doesnot exists in Pre Aggregate table")
    +    }
    +    // finding the child attribute from child logical relation
    +    childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
    +  }
    +
    +  /**
    +   * Below method will be used to transform the main table plan to child table plan
    +   * rules for transformming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table
with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with
count)).
    +   * Note: During aggregate table creation for average table will be created with two
columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @param aggDataMapSchema
    +   * select data map schema
    +   * @param childCarbonRelation
    +   * child carbon table relation
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
    +      aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan
= {
    +    logicalPlan.transform {
    +      case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            child,
    +            None,
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          newChild)
    +      case Aggregate(grExp,
    +      aggExp,
    +      Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            child,
    +            Some(expression),
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          Filter(updatedFilterExpression.get,
    +            newChild))
    +      case Aggregate(grExp, aggExp, l: LogicalRelation)
    +        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +           &&
    +           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap
=>
    +        val (updatedGroupExp, updatedAggExp, newChild, None) =
    +          getUpdatedExpressions(grExp,
    +            aggExp,
    +            l,
    +            None,
    +            aggDataMapSchema,
    +            childCarbonRelation)
    +        Aggregate(updatedGroupExp,
    +          updatedAggExp,
    +          newChild)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to get the updated expression for pre aggregated table.
    +   * It will replace the attribute of actual plan with child table attributes.
    +   * Updation will be done for below expression.
    +   * 1. Grouping expression
    +   * 2. aggregate expression
    +   * 3. child logical plan
    +   * 4. filter expression if present
    +   *
    +   * @param groupingExpressions
    +   * actual plan grouping expression
    +   * @param aggregateExpressions
    +   * actual plan aggregate expression
    +   * @param child
    +   * child logical plan
    +   * @param filterExpression
    +   * filter expression
    +   * @param aggDataMapSchema
    +   * pre aggregate table schema
    +   * @param childCarbonRelation
    +   * pre aggregate table logical relation
    +   * @return tuple of(updated grouping expression,
    +   *         updated aggregate expression,
    +   *         updated child logical plan,
    +   *         updated filter expression if present in actual plan)
    +   */
    +  def getUpdatedExpressions(groupingExpressions: Seq[Expression],
    +      aggregateExpressions: Seq[NamedExpression],
    +      child: LogicalPlan, filterExpression: Option[Expression] = None,
    +      aggDataMapSchema: DataMapSchema,
    +      childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression],
LogicalPlan,
    +    Option[Expression]) = {
    +    // transforming the group by expression attributes with child attributes
    +    val updatedGroupExp = groupingExpressions.map { exp =>
    +      exp.transform {
    +        case attr: AttributeReference =>
    +          getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
    +      }
    +    }
    +    // below code is for updating the aggregate expression.
    +    // Note: In case of aggregate expression updation we need to return alias as
    +    //       while showing the final result we need to show based on actual query
    +    //       for example: If query is "select name from table group by name"
    +    //       if we only update the attributes it will show child table column name in
final output
    +    //       so for handling this if attributes does not have alias we need to return
alias of
    +    // parent
    +    //       table column name
    +    // Rules for updating aggregate expression.
    +    // 1. If it matches with attribute reference return alias of child attribute reference
    +    // 2. If it matches with alias return same alias with child attribute reference
    +    // 3. If it matches with alias of any supported aggregate function return aggregate
function
    +    // with child attribute reference. Please check class level documentation how when
aggregate
    +    // function will be updated
    +
    +    val updatedAggExp = aggregateExpressions.map {
    +      exp => exp match {
    --- End diff --
    
    no need of this match , directly use case inside map


---

Mime
View raw message