carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipesala <...@git.apache.org>
Subject [GitHub] carbondata pull request #1488: [CARBONDATA-1528] [PreAgg] Add validation for...
Date Sun, 12 Nov 2017 10:58:59 GMT
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1488#discussion_r150406795
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/listeners/PreAggregateRenameTablePostListener.scala
---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.preaaggregate.listeners
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{CarbonEnv, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.execution.command.AlterTableRenameModel
    +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
    +
    +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
    +import org.apache.carbondata.events.{AlterTableRenamePostEvent, Event, OperationContext,
OperationEventListener}
    +import org.apache.carbondata.format.SchemaEvolutionEntry
    +
    +object PreAggregateRenameTablePostListener extends OperationEventListener {
    +
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val renameTablePostListener = event.asInstanceOf[AlterTableRenamePostEvent]
    +    val carbonTable = renameTablePostListener.carbonTable
    +    implicit val sparkSession: SparkSession = renameTablePostListener.sparkSession
    +    val renameTableModel = renameTablePostListener.alterTableRenameModel
    +    val oldParentTableName = renameTableModel.oldTableIdentifier.table
    +    val oldParentDatabaseName = renameTableModel.oldTableIdentifier.database.getOrElse("default")
    +    val newParentTableName = renameTableModel.newTableIdentifier.table
    +    if (carbonTable.hasPreAggregateTables) {
    +      val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList.asScala
    +      dataMapSchemas.foreach {
    +        dataMapSchema =>
    +          val childTableIdentifier = dataMapSchema.getRelationIdentifier
    +          val childCarbonTable = PreAggregateUtil
    +            .getChildCarbonTable(childTableIdentifier.getDatabaseName,
    +              childTableIdentifier.getTableName)(sparkSession)
    +          updateChildTableWithNewParent(renameTableModel, childCarbonTable)(sparkSession)
    +      }
    +    }
    +    val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
    +    schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
    +    updateParentRelationIdentifierForColumns(carbonTable.getTableInfo,
    +      oldParentDatabaseName,
    +      oldParentTableName,
    +      newParentTableName)
    +    updateTableSchema(carbonTable,
    +      schemaEvolutionEntry,
    +      oldParentDatabaseName,
    +      oldParentTableName)
    +  }
    +
    +  private def updateChildTableWithNewParent(renameTableModel: AlterTableRenameModel,
    +      childCarbonTable: Option[CarbonTable])(implicit sparkSession: SparkSession): Unit
= {
    +    val oldParentTableName = renameTableModel.oldTableIdentifier.table
    +    val oldParentDatabaseName = renameTableModel.oldTableIdentifier.database.getOrElse("default")
    +    val newParentTableName = renameTableModel.newTableIdentifier.table
    +    if (childCarbonTable.isDefined) {
    +      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
    +      schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
    +      val childTableInfo = childCarbonTable.get.getTableInfo
    +      childTableInfo.getParentRelationIdentifiers.asScala.foreach {
    +        parentRelationIdentifier =>
    +          if (parentRelationIdentifier.getDatabaseName
    +                .equalsIgnoreCase(oldParentDatabaseName) &&
    +              parentRelationIdentifier.getTableName
    +                .equalsIgnoreCase(oldParentTableName)) {
    +            parentRelationIdentifier.setTableName(newParentTableName)
    +          }
    +      }
    +      updateTableSchema(childCarbonTable.get,
    +        schemaEvolutionEntry,
    +        childTableInfo.getDatabaseName,
    +        childTableInfo.getFactTable.getTableName)
    +    }
    +  }
    +
    +  private def updateParentRelationIdentifierForColumns(parentTableInfo: TableInfo,
    +      oldParentDatabaseName: String,
    +      oldParentTableName: String,
    +      newParentTableName: String): Unit = {
    +    parentTableInfo.getFactTable.getListOfColumns.asScala.foreach {
    +      columnSchema =>
    --- End diff --
    
    Ok, please block it for now.


---

Mime
View raw message