carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipesala <...@git.apache.org>
Subject [GitHub] carbondata pull request #954: [WIP] IUD support in 2.1
Date Mon, 29 May 2017 06:07:36 GMT
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/954#discussion_r118870584
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
---
    @@ -73,3 +78,136 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
         }
       }
     }
    +
    +object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
    +
    +  var sparkSession: SparkSession = _
    +
    +  def init(sparkSession: SparkSession) {
    +     this.sparkSession = sparkSession
    +  }
    +
    +  private def processUpdateQuery(
    +      table: UnresolvedRelation,
    +      columns: List[String],
    +      selectStmt: String,
    +      filter: String): LogicalPlan = {
    +    var includedDestColumns = false
    +    var includedDestRelation = false
    +    var addedTupleId = false
    +
    +    def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
    +      val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
    +        Seq.empty, isDistinct = false), "tupleId")())
    +      val projList = Seq(
    +        UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
    +      // include tuple id and rest of the required columns in subqury
    +      SubqueryAlias(table.alias.getOrElse(""), Project(projList, relation), Option(table.tableIdentifier))
    +    }
    +    // get the un-analyzed logical plan
    +    val targetTable = prepareTargetReleation(table)
    +    val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt)
transform {
    +      case Project(projectList, child) if (!includedDestColumns) =>
    +        includedDestColumns = true
    +        if (projectList.size != columns.size) {
    +          sys.error("Number of source and destination columns are not matching")
    +        }
    +        val renamedProjectList = projectList.zip(columns).map{ case(attr, col) =>
    +          attr match {
    +            case UnresolvedAlias(child22, _) =>
    +              UnresolvedAlias(Alias(child22, col + "-updatedColumn")())
    +            case UnresolvedAttribute(param) =>
    +              UnresolvedAlias(Alias(attr, col + "-updatedColumn")())
    +             // UnresolvedAttribute(col + "-updatedColumn")
    +//              UnresolvedAlias(Alias(child, col + "-updatedColumn")())
    +            case _ => attr
    +          }
    +        }
    +        val list = Seq(
    +          UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq)))) ++ renamedProjectList
    +        Project(list, child)
    +      case Filter(cond, child) if (!includedDestRelation) =>
    +        includedDestRelation = true
    +        Filter(cond, Join(child, targetTable, Inner, None))
    +      case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
    +                                            t != table.tableIdentifier) =>
    +        includedDestRelation = true
    +        Join(r, targetTable, Inner, None)
    +    }
    +    val updatedSelectPlan : LogicalPlan = if (!includedDestRelation) {
    +      // special case to handle self join queries
    +      // Eg. update tableName  SET (column1) = (column1+1)
    +      selectPlan transform {
    +        case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier
&&
    +                                              addedTupleId == false) =>
    +          addedTupleId = true
    +          targetTable
    +      }
    +    } else {
    +      selectPlan
    +    }
    +    val finalPlan = if (filter.length > 0) {
    +      val alias = table.alias.getOrElse("")
    +      var transformed: Boolean = false
    +      // Create a dummy projection to include filter conditions
    +      var newPlan: LogicalPlan = null
    +      if (table.tableIdentifier.database.isDefined) {
    +        newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select
* from  " +
    +                                                                     table.tableIdentifier.database
    +                                                                       .getOrElse("")
+ "." +
    +                                                                     table.tableIdentifier.table
+
    +                                                                     " " + alias + "
" +
    +                                                                     filter)
    +      }
    +      else {
    +        newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select
* from  " +
    +                                                                     table.tableIdentifier.table
+
    +                                                                     " " + alias + "
" +
    +                                                                     filter)
    +      }
    +      newPlan transform {
    +        case UnresolvedRelation(t, Some(a)) if (
    +          !transformed && t == table.tableIdentifier && a == alias) =>
    +          transformed = true
    +          // Add the filter condition of update statement  on destination table
    +          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +      }
    +    } else {
    +      updatedSelectPlan
    +    }
    +    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
    +    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sparkSession))
    +    val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
    +    ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
    +  }
    +
    +  def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan
= {
    +   // val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
    +   val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
    +     table.tableIdentifier.table)
    +    var addedTupleId = false
    +    val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt)
transform {
    +      case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier
&&
    +                                            addedTupleId == false) =>
    +        addedTupleId = true
    +        val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
    +          Seq.empty, isDistinct = false), "tupleId")())
    +        val projList = Seq(
    +          UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
    --- End diff --
    
    This won't work if there is no alias to table. if there no alias then it should be passed
as none


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message