carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzcclp <...@git.apache.org>
Subject [GitHub] incubator-carbondata pull request #492: [CARBONDATA-440] Providing the updat...
Date Fri, 06 Jan 2017 17:36:26 GMT
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/492#discussion_r94985585
  
    --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
---
    @@ -72,23 +74,71 @@ object CarbonOptimizer {
     class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
       extends Rule[LogicalPlan] with PredicateHelper {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    -  def apply(plan: LogicalPlan): LogicalPlan = {
    -    if (relations.nonEmpty && !isOptimized(plan)) {
    +  def apply(logicalPlan: LogicalPlan): LogicalPlan = {
    +    if (relations.nonEmpty && !isOptimized(logicalPlan)) {
    +      val plan = processPlan(logicalPlan)
    +      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan)
           LOGGER.info("Starting to optimize plan")
           val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
           val queryStatistic = new QueryStatistic()
    -      val result = transformCarbonPlan(plan, relations)
    +      val result = transformCarbonPlan(udfTransformedPlan, relations)
           queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
             System.currentTimeMillis)
           recorder.recordStatistics(queryStatistic)
           recorder.logStatistics()
           result
         } else {
           LOGGER.info("Skip CarbonOptimizer")
    -      plan
    +      logicalPlan
         }
       }
     
    +  private def processPlan(plan: LogicalPlan): LogicalPlan = {
    +    plan transform {
    +      case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
    +        var isTransformed = false
    +        val newPlan = updatePlan transform {
    +          case Project(pList, child) if (!isTransformed) =>
    +            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
    +              .splitAt(pList.size - cols.size)
    +            val diff = cols.diff(dest.map(_.name))
    +            if (diff.size > 0) {
    +              sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
    +            }
    +            isTransformed = true
    +            Project(dest.filter(a => !cols.contains(a.name)) ++ source, child)
    +        }
    +        ProjectForUpdateCommand(newPlan, table.tableIdentifier)
    --- End diff --
    
    @ravikiran23 @jackylk UnresolvedRelation.tableIdentifier is a 'Seq[String]' type in Spark-1.5
while in Spark-1.6 it's a 'TableIdentifier' type, so compile unsuccessfully with Spark-1.6.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message