carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [37/50] carbondata git commit: changed revert condition
Date Thu, 11 May 2017 13:53:27 GMT
changed revert condition


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0d122d7c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0d122d7c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0d122d7c

Branch: refs/heads/branch-1.1
Commit: 0d122d7c16c202448caab50e5af9e603931b0bcb
Parents: a860b87
Author: kunal642 <kunal.kapoor@knoldus.in>
Authored: Mon Apr 24 12:22:57 2017 +0530
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Tue May 9 15:24:10 2017 +0800

----------------------------------------------------------------------
 .../execution/command/AlterTableCommands.scala  | 100 +++++++++++--------
 .../org/apache/spark/util/AlterTableUtil.scala  |  24 ++---
 .../rowreader/AddColumnTestCases.scala          |  21 ++++
 3 files changed, 93 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d122d7c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 280d459..4ac3ea2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -29,9 +29,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
@@ -51,15 +52,20 @@ private[sql] case class AlterTableAddColumns(
     LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
-    var lastUpdatedTime = 0L
+    var timeStamp = 0L
     var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    var carbonTable: CarbonTable = null
     try {
-      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      // Consider a concurrent scenario where 2 alter operations are executed in parallel.
1st
+      // operation is success and updates the schema file. 2nd operation will get the lock
after
+      // completion of 1st operation but as look up relation is called before it will have
the
+      // older carbon table and this can lead to inconsistent state in the system. Therefor
look
+      // up relation should be called after acquiring the lock
+      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
       // get the latest carbon table and check for column existence
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -84,8 +90,9 @@ private[sql] case class AlterTableAddColumns(
         newCols,
         carbonTable.getCarbonTableIdentifier,
         carbonTable.getStorePath).collect()
+      timeStamp = System.currentTimeMillis
       val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
-      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
+      schemaEvolutionEntry.setTimeStamp(timeStamp)
       schemaEvolutionEntry.setAdded(newCols.toList.asJava)
       val thriftTable = schemaConverter
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
@@ -105,7 +112,7 @@ private[sql] case class AlterTableAddColumns(
             newCols,
             carbonTable.getCarbonTableIdentifier,
             carbonTable.getStorePath).collect()
-          AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
+          AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
         }
         sys.error(s"Alter table add operation failed: ${e.getMessage}")
     } finally {
@@ -155,13 +162,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel:
AlterTableR
       LockUsage.CLEAN_FILES_LOCK,
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
-    var lastUpdatedTime = 0L
-    val carbonTable = relation.tableMeta.carbonTable
+    var timeStamp = 0L
+    var carbonTable: CarbonTable = null
     try {
-      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
             sparkSession)
+      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -170,7 +179,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
         .carbonMetastore.readSchemaFile(tableMetadataFile)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
-      schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
+      timeStamp = System.currentTimeMillis()
+      schemaEvolutionEntry.setTime_stamp(timeStamp)
       renameBadRecords(oldTableName, newTableName, oldDatabaseName)
       val fileType = FileFactory.getFileType(tableMetadataFile)
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -205,25 +215,29 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel:
AlterTableR
     } catch {
       case e: Exception => LOGGER
         .error("Rename table failed: " + e.getMessage)
-        AlterTableUtil
-          .revertRenameTableChanges(oldTableIdentifier,
-            newTableName,
-            carbonTable.getStorePath,
-            carbonTable.getCarbonTableIdentifier.getTableId,
-            lastUpdatedTime)(
-            sparkSession)
-        renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+        if (carbonTable != null) {
+          AlterTableUtil
+            .revertRenameTableChanges(oldTableIdentifier,
+              newTableName,
+              carbonTable.getStorePath,
+              carbonTable.getCarbonTableIdentifier.getTableId,
+              timeStamp)(
+              sparkSession)
+          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+        }
         sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)
       // case specific to rename table as after table rename old table path will not be found
-      AlterTableUtil
-        .releaseLocksManually(locks,
-          locksToBeAcquired,
-          oldDatabaseName,
-          newTableName,
-          carbonTable.getStorePath)
+      if (carbonTable != null) {
+        AlterTableUtil
+          .releaseLocksManually(locks,
+            locksToBeAcquired,
+            oldDatabaseName,
+            newTableName,
+            carbonTable.getStorePath)
+      }
     }
     Seq.empty
   }
@@ -258,16 +272,16 @@ private[sql] case class AlterTableDropColumns(
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
     var locks = List.empty[ICarbonLock]
-    var lastUpdatedTime = 0L
+    var timeStamp = 0L
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     // get the latest carbon table and check for column existence
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
+    var carbonTable: CarbonTable = null
     try {
-      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
       // check each column existence in the table
       val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -318,7 +332,8 @@ private[sql] case class AlterTableDropColumns(
         }
       }
       // add deleted columns to schema evolution history and update the schema
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
       schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
       AlterTableUtil
         .updateSchemaInfo(carbonTable,
@@ -336,7 +351,9 @@ private[sql] case class AlterTableDropColumns(
     } catch {
       case e: Exception => LOGGER
         .error("Alter table drop columns failed : " + e.getMessage)
-        AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
         sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
@@ -359,14 +376,14 @@ private[sql] case class AlterTableDataTypeChange(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     // get the latest carbon table and check for column existence
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
-      .carbonTable
-    var lastUpdatedTime = 0L
+    var carbonTable: CarbonTable = null
+    var timeStamp = 0L
     try {
-      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
       val columnName = alterTableDataTypeChangeModel.columnName
       val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -403,7 +420,8 @@ private[sql] case class AlterTableDataTypeChange(
           addColumnSchema = columnSchema
         }
       }
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
       schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
       schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
       tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
@@ -418,7 +436,9 @@ private[sql] case class AlterTableDataTypeChange(
     } catch {
       case e: Exception => LOGGER
         .error("Alter table change datatype failed : " + e.getMessage)
-        AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
         sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d122d7c/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 349c1d9..d7b1422 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -194,14 +194,14 @@ object AlterTableUtil {
    *
    * @param oldTableIdentifier
    * @param newTableName
-   * @param lastUpdatedTime
+   * @param timeStamp
    * @param sparkSession
    */
   def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
       newTableName: String,
       storePath: String,
       tableId: String,
-      lastUpdatedTime: Long)
+      timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
@@ -215,7 +215,7 @@ object AlterTableUtil {
         .readSchemaFile(tableMetadataFile)
       val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
-      if (updatedTime > lastUpdatedTime) {
+      if (updatedTime == timeStamp) {
         LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
         FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR
+
@@ -237,10 +237,10 @@ object AlterTableUtil {
    *
    * @param dbName
    * @param tableName
-   * @param lastUpdatedTime
+   * @param timeStamp
    * @param sparkSession
    */
-  def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+  def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
@@ -253,7 +253,7 @@ object AlterTableUtil {
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
-    if (updatedTime > lastUpdatedTime) {
+    if (updatedTime == timeStamp) {
       LOGGER.info(s"Reverting changes for $dbName.$tableName")
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
@@ -268,10 +268,10 @@ object AlterTableUtil {
    *
    * @param dbName
    * @param tableName
-   * @param lastUpdatedTime
+   * @param timeStamp
    * @param sparkSession
    */
-  def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+  def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
@@ -283,7 +283,7 @@ object AlterTableUtil {
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
-    if (updatedTime > lastUpdatedTime) {
+    if (updatedTime == timeStamp) {
       LOGGER.error(s"Reverting changes for $dbName.$tableName")
       val removedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
       thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
@@ -304,10 +304,10 @@ object AlterTableUtil {
    *
    * @param dbName
    * @param tableName
-   * @param lastUpdatedTime
+   * @param timeStamp
    * @param sparkSession
    */
-  def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+  def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
@@ -319,7 +319,7 @@ object AlterTableUtil {
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
-    if (updatedTime > lastUpdatedTime) {
+    if (updatedTime == timeStamp) {
       LOGGER.error(s"Reverting changes for $dbName.$tableName")
       val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
       thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d122d7c/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
index f994083..7c94ebf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/rowreader/AddColumnTestCases.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.carbondata.restructure.rowreader
 
+import java.io.{File, FileOutputStream, FileWriter}
 import java.math.{BigDecimal, RoundingMode}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
 class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
@@ -207,6 +209,25 @@ class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test ("test to check if exception is thrown if table is locked for updation") {
+    intercept[Exception] {
+      sql("DROP TABLE IF EXISTS carbon_table")
+      sql(
+        "CREATE TABLE carbon_table(intField int,stringField string,charField string,timestampField
" +
+        "timestamp)STORED BY 'carbondata' TBLPROPERTIES" +
+        "('DICTIONARY_EXCLUDE'='charField')")
+      val lockFilePath = s"${ TestQueryExecutor.storeLocation }/default/carbon_table/meta.lock"
+      new File(lockFilePath).createNewFile()
+      sql(
+        "Alter table carbon_table add columns(newfield string) TBLPROPERTIES ('DEFAULT.VALUE.newfield'='c')")
+      new FileOutputStream(lockFilePath).getChannel.lock()
+      sql(
+        "Alter table carbon_table drop columns(newfield)")
+      new File(lockFilePath).delete()
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS addcolumntest")
     sql("drop table if exists hivetable")


Mime
View raw message