carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvramana <...@git.apache.org>
Subject [GitHub] incubator-carbondata pull request #641: [CARBONDATA-767] Alter table support...
Date Wed, 15 Mar 2017 06:50:51 GMT
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/641#discussion_r106095089
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
---
    @@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel)
extends Runnab
       }
     }
     
    +private[sql] case class AlterTableDataTypeChange(
    +    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand
{
    +
    +  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableDataTypeChangeModel.tableName
    +    val dbName = alterTableDataTypeChangeModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table change data type request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
    +      val columnName = alterTableDataTypeChangeModel.columnName
    +      var carbonColumnToBeModified: CarbonColumn = null
    +      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala
    +      // read the latest schema file
    +      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      // maintain the added column for schema evolution history
    +      var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
    +      var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
    +      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
    +      columnSchemaList.foreach { columnSchema =>
    +        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
    +          deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
    +          columnSchema.setData_type(DataTypeConverterUtil
    +            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
    +          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
    +          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
    +          addColumnSchema = columnSchema
    +        }
    +      }
    +      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
    +      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
    +      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    +          tableInfo,
    +          schemaEvolutionEntry,
    +          carbonTable.getStorePath)(sparkSession)
    +
    +      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
    +      val schema = CarbonEnv.get.carbonMetastore
    +        .lookupRelation(tableIdentifier)(sparkSession).schema.json
    +      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
    +        s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
    +      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
    +      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
    +      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
    +    } catch {
    +      case e: Exception =>
    +        LOGGER.error("Alter table change datatype failed : " + e.getMessage)
    +        throw e
    +    } finally {
    +      // release lock after command execution completion
    +      if (carbonLock != null) {
    +        if (carbonLock.unlock()) {
    +          LOGGER.info("Alter table change data type lock released successfully")
    +        } else {
    +          LOGGER.error("Unable to release lock during alter table change data type operation")
    +        }
    +      }
    +    }
    +    Seq.empty
    +  }
    +}
    +
    +private[sql] case class AlterTableAddColumns(
    +    alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
    +
    +  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableAddColumnsModel.tableName
    +    val dbName = alterTableAddColumnsModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table add columns request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
    +      // read the latest schema file
    +      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val thriftTableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
    +      val wrapperTableInfo = schemaConverter
    +        .fromExternalToWrapperTableInfo(thriftTableInfo,
    +          dbName,
    +          tableName,
    +          carbonTable.getStorePath)
    +      val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
    +        dbName,
    +        wrapperTableInfo,
    +        carbonTablePath,
    +        carbonTable.getCarbonTableIdentifier,
    +        carbonTable.getStorePath).process
    +      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
    +      .schema.SchemaEvolutionEntry()
    +      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
    +
    +      val thriftTable = schemaConverter
    +        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
    +      thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    --- End diff --
    
    Write common functions, avoid duplicate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message