carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [18/24] carbondata git commit: [CARBONDATA-1229] acquired meta.lock during table drop
Date Wed, 12 Jul 2017 15:09:30 GMT
[CARBONDATA-1229] acquired meta.lock during table drop

This closes #1153


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/403c3d9b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/403c3d9b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/403c3d9b

Branch: refs/heads/streaming_ingest
Commit: 403c3d9b41e166311ac45ec33b375cbecc8c4741
Parents: 619f1f9
Author: kunalkapoor <kunalkapoor642@gmail.com>
Authored: Mon Jul 10 12:12:10 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Mon Jul 10 19:32:43 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/CarbonLockUtil.java   | 24 +++++++++
 .../execution/command/carbonTableSchema.scala   | 52 +++++++++-----------
 .../org/apache/spark/util/AlterTableUtil.scala  | 25 +---------
 3 files changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index fba03a1..eaaaf94 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
 /**
  * This class contains all carbon lock utilities
@@ -60,4 +61,27 @@ public class CarbonLockUtil {
       }
     }
   }
+
+  /**
+   * Given a lock type this method will return a new lock object if not acquired by any other
+   * operation
+   *
+   * @param carbonTable
+   * @param lockType
+   * @return
+   */
+  public static ICarbonLock getLockObject(CarbonTable carbonTable,
+                            String lockType) {
+    ICarbonLock carbonLock = CarbonLockFactory
+            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(),
+                    lockType);
+    LOGGER.info("Trying to acquire lock: " + carbonLock);
+    if (carbonLock.lockWithRetries()) {
+      LOGGER.info("Successfully acquired the lock " + carbonLock);
+    } else {
+      throw new RuntimeException("Table is locked for updation. Please try after some time");
+    }
+    return carbonLock;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8e7db45..2e5812c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.io.File
-
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
 import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
@@ -41,10 +40,10 @@ import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock,
LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -834,24 +833,17 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
-    val carbonLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName + CarbonCommonConstants.UNDERSCORE +
-        LockUsage.DROP_TABLE_LOCK)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storePath = catalog.storePath
-    var isLocked = false
     catalog.checkSchemasModifiedTimeAndReloadTables()
+    val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
-      isLocked = carbonLock.lockWithRetries()
-      if (isLocked) {
-        logInfo("Successfully able to get the lock for drop.")
-      }
-      else {
-        LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
-        sys.error("Table is locked for deletion. Please try after some time")
+      val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
+       locksToBeAcquired foreach {
+        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
       if (null != carbonTable) {
         // clear driver B-tree and dictionary cache
         ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
@@ -859,18 +851,22 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
       CarbonEnv.getInstance(sparkSession).carbonMetastore
         .dropTable(storePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
     } finally {
-      if (carbonLock != null && isLocked) {
-        if (carbonLock.unlock()) {
-          logInfo("Table MetaData Unlocked Successfully after dropping the table")
-          // deleting any remaining files.
-          val metadataFilePath = CarbonStorePath
-            .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
-          val fileType = FileFactory.getFileType(metadataFilePath)
-          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
-          }
+      if (carbonLocks.nonEmpty) {
+        val unlocked = carbonLocks.forall(_.unlock())
+        if (unlocked) {
+          logInfo("Table MetaData Unlocked Successfully")
+        }
+        // deleting any remaining files.
+        val metadataFilePath = CarbonStorePath
+          .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+        val fileType = FileFactory.getFileType(metadataFilePath)
+        if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+          val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+          CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 9e402cd..87717fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock}
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -65,7 +65,7 @@ object AlterTableUtil {
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
-        acquiredLocks += getLockObject(table, lock)
+        acquiredLocks += CarbonLockUtil.getLockObject(table, lock)
       }
       acquiredLocks.toList
     } catch {
@@ -76,27 +76,6 @@ object AlterTableUtil {
   }
 
   /**
-   * Given a lock type this method will return a new lock object if not acquired by any other
-   * operation
-   *
-   * @param carbonTable
-   * @param lockType
-   * @return
-   */
-  private def getLockObject(carbonTable: CarbonTable,
-      lockType: String): ICarbonLock = {
-    val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        lockType)
-    if (carbonLock.lockWithRetries()) {
-      LOGGER.info(s"Successfully acquired the lock $lockType")
-    } else {
-      sys.error("Table is locked for updation. Please try after some time")
-    }
-    carbonLock
-  }
-
-  /**
    * This method will release the locks acquired for an operation
    *
    * @param locks


Mime
View raw message