carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1229] restrict drop when loading is in progress
Date Wed, 02 Aug 2017 16:09:45 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 545e93db3 -> 414ea7730


[CARBONDATA-1229] restrict drop when loading is in progress

This PR will restrict the table from getting dropped if a load is in progres

This closes #1168


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/414ea773
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/414ea773
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/414ea773

Branch: refs/heads/master
Commit: 414ea7730d418e6580bb3f8f52e19812e92e3620
Parents: 545e93d
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Thu Jul 13 00:55:56 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Aug 3 00:09:28 2017 +0800

----------------------------------------------------------------------
 .../execution/command/carbonTableSchema.scala   | 32 ++++++++++----------
 .../execution/command/carbonTableSchema.scala   | 23 ++++++++++++--
 2 files changed, 36 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/414ea773/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 9a02d13..00dfaec 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.io.File
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
@@ -41,7 +42,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock,
LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
@@ -767,27 +768,26 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp:
O
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     val identifier = TableIdentifier(tableName, Option(dbName))
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
-    val carbonLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName + CarbonCommonConstants.UNDERSCORE +
-        LockUsage.DROP_TABLE_LOCK)
-    val storePath = CarbonEnv.get.carbonMetastore.storePath
-    var isLocked = false
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+    val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
+    val catalog = CarbonEnv.get.carbonMetastore
+    val storePath = catalog.storePath
     try {
-      isLocked = carbonLock.lockWithRetries()
-      if (isLocked) {
-        logInfo("Successfully able to get the lock for drop.")
-      }
-      else {
-        LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
-        sys.error("Table is locked for deletion. Please try after some time")
+      locksToBeAcquired foreach {
+        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
+        sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
     } finally {
-      if (carbonLock != null && isLocked) {
-        if (carbonLock.unlock()) {
-          logInfo("Table MetaData Unlocked Successfully after dropping the table")
+      if (carbonLocks.nonEmpty) {
+        val unlocked = carbonLocks.forall(_.unlock())
+        if (unlocked) {
+          logInfo("Table MetaData Unlocked Successfully")
           // deleting any remaining files.
           val metadataFilePath = CarbonStorePath
             .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/414ea773/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d34b91d..80b1436 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -36,6 +36,7 @@ import org.codehaus.jackson.map.ObjectMapper
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -874,29 +875,45 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val identifier = TableIdentifier(tableName, Option(dbName))
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
-    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonEnv = CarbonEnv.getInstance(sparkSession)
+    val catalog = carbonEnv.carbonMetastore
+    val storePath = carbonEnv.storePath
     val tableIdentifier =
       AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
         dbName.toLowerCase, tableName.toLowerCase)
     catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
-      locksToBeAcquired foreach {
+       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-
+       val carbonTable = catalog
+         .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation].metaData.carbonTable
+      if (null != carbonTable) {
+        // clear driver B-tree and dictionary cache
+        ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
+      }
       CarbonEnv.getInstance(sparkSession).carbonMetastore
           .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
         LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
+        sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
     } finally {
       if (carbonLocks.nonEmpty) {
         val unlocked = carbonLocks.forall(_.unlock())
         if (unlocked) {
           logInfo("Table MetaData Unlocked Successfully")
+          // deleting any remaining files.
+          val metadataFilePath = CarbonStorePath
+            .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+          val fileType = FileFactory.getFileType(metadataFilePath)
+          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
+          }
         }
       }
     }


Mime
View raw message