Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9A540200D5D for ; Tue, 14 Nov 2017 17:59:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 98C90160BD7; Tue, 14 Nov 2017 16:59:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 58520160C0B for ; Tue, 14 Nov 2017 17:59:42 +0100 (CET) Received: (qmail 38053 invoked by uid 500); 14 Nov 2017 16:59:41 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 38020 invoked by uid 99); 14 Nov 2017 16:59:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2017 16:59:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84F9FF159F; Tue, 14 Nov 2017 16:59:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.apache.org Date: Tue, 14 Nov 2017 16:59:41 -0000 Message-Id: <6dfcc511624e458f8cb73ac775d145e4@git.apache.org> In-Reply-To: <4435340b5253471bb2ca7e898394694c@git.apache.org> References: <4435340b5253471bb2ca7e898394694c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] carbondata git commit: [CARBONDATA-1573] [Integration] Support Database Location Configuration while Creating Database/ Support Creation of carbon Table in the database location archived-at: Tue, 14 Nov 2017 16:59:44 -0000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 2671aad..c7db436 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -70,7 +70,6 @@ case class CarbonDictionaryDecoder( override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { - val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) @@ -81,7 +80,7 @@ case class CarbonDictionaryDecoder( child.execute().mapPartitions { iter => val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, forwardDictionaryCache) val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) @@ -124,7 +123,6 @@ case class CarbonDictionaryDecoder( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) @@ -133,9 +131,9 @@ case class CarbonDictionaryDecoder( if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(absoluteTableIdentifiers, - forwardDictionaryCache, storePath) + forwardDictionaryCache) val exprs = child.output.map { exp => ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output)) @@ -252,7 +250,7 @@ case class CarbonDictionaryDecoder( if (f._2 != null) { try { cache.get(new DictionaryColumnUniqueIdentifier( - atiMap(f._1).getCarbonTableIdentifier, + atiMap(f._1), f._2, f._3.getDataType, CarbonStorePath.getCarbonTablePath(atiMap(f._1)))) } catch { @@ -266,33 +264,31 @@ case class CarbonDictionaryDecoder( } private def getDictionaryWrapper(atiMap: Map[String, AbsoluteTableIdentifier], - cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary], storePath: String) = { + cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = { val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]() val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map { case (tableName, columnIdentifier, carbonDimension) => if (columnIdentifier != null) { try { - val (newCarbonTableIdentifier, newColumnIdentifier) = + val (newAbsoluteTableIdentifier, newColumnIdentifier) = if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations && !carbonDimension .getColumnSchema.getParentColumnTableRelations.isEmpty) { - (QueryUtil.getTableIdentifierForColumn(carbonDimension), + (QueryUtil.getTableIdentifierForColumn(carbonDimension, atiMap(tableName)), new ColumnIdentifier(carbonDimension.getColumnSchema .getParentColumnTableRelations.get(0).getColumnId, carbonDimension.getColumnProperties, carbonDimension.getDataType)) } else { - (atiMap(tableName).getCarbonTableIdentifier, columnIdentifier) + (atiMap(tableName), columnIdentifier) } val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier( - newCarbonTableIdentifier, + newAbsoluteTableIdentifier, newColumnIdentifier, carbonDimension.getDataType, CarbonStorePath - .getCarbonTablePath(atiMap(tableName).getStorePath, newCarbonTableIdentifier)) + .getCarbonTablePath(newAbsoluteTableIdentifier)) allDictIdentifiers += dictionaryColumnUniqueIdentifier - new ForwardDictionaryWrapper( - storePath, - dictionaryColumnUniqueIdentifier) + new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier) } catch { case _: Throwable => null } @@ -300,7 +296,7 @@ case class CarbonDictionaryDecoder( null } } - val dictionaryLoader = new DictionaryLoader(storePath, allDictIdentifiers.toList) + val dictionaryLoader = new DictionaryLoader(allDictIdentifiers.toList) dicts.foreach { dict => if (dict != null) { dict.setDictionaryLoader(dictionaryLoader) @@ -467,7 +463,6 @@ class CarbonDecoderRDD( aliasMap: CarbonAliasDecoderRelation, prev: RDD[InternalRow], output: Seq[Attribute], - storePath: String, serializedTableInfo: Array[Byte]) extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) { @@ -516,7 +511,7 @@ class CarbonDecoderRDD( val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, forwardDictionaryCache) val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) @@ -559,7 +554,7 @@ class CarbonDecoderRDD( if (f._2 != null) { try { cache.get(new DictionaryColumnUniqueIdentifier( - atiMap(f._1).getCarbonTableIdentifier, + atiMap(f._1), f._2, f._3.getDataType, CarbonStorePath.getCarbonTablePath(atiMap(f._1)))) } catch { @@ -578,10 +573,9 @@ class CarbonDecoderRDD( /** * It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in * case of codegen - * @param storePath + * @param dictIdentifier Dictionary column unique identifier */ class ForwardDictionaryWrapper( - val storePath: String, dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable { var dictionary: Dictionary = null @@ -610,8 +604,8 @@ class ForwardDictionaryWrapper( /** * It is Dictionary Loader class to load all dictionaries at a time instead of one by one. */ -class DictionaryLoader(storePath: String, - allDictIdentifiers: List[DictionaryColumnUniqueIdentifier]) extends Serializable { +class DictionaryLoader(allDictIdentifiers: List[DictionaryColumnUniqueIdentifier]) + extends Serializable { var isDictionaryLoaded = false @@ -621,7 +615,7 @@ class DictionaryLoader(storePath: String, if (!isDictionaryLoaded) { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY) allDicts = forwardDictionaryCache.getAll(allDictIdentifiers.asJava) isDictionaryLoaded = true val dictionaryTaskCleaner = TaskContext.get http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index de01c8d..fba590e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -222,10 +222,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider // check "tablePath" option val tablePathOption = parameters.get("tablePath") + val dbName: String = parameters.getOrElse("dbName", + CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase + val tableOption: Option[String] = parameters.get("tableName") + if (tableOption.isEmpty) { + throw new CarbonStreamException("Table creation failed. Table name is not specified") + } + val tableName = tableOption.get.toLowerCase() + if (tableName.contains(" ")) { + throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " + + "space") + } if (tablePathOption.isDefined) { val sparkSession = sqlContext.sparkSession val identifier: AbsoluteTableIdentifier = - AbsoluteTableIdentifier.fromTablePath(tablePathOption.get) + AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable @@ -303,18 +314,20 @@ object CarbonSource { val tableName: String = properties.getOrElse("tableName", "").toLowerCase val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName) val tableInfo: TableInfo = TableNewProcessor(model) - val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName + val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName val schemaEvolutionEntry = new SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) tableInfo.getFactTable.getSchemaEvalution. getSchemaEvolutionEntryList.add(schemaEvolutionEntry) val map = if (metaStore.isReadFromHiveMetaStore) { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(tableIdentifier.getStorePath) + tableInfo.setTablePath(tableIdentifier.getTablePath) CarbonUtil.convertToMultiStringMap(tableInfo) } else { metaStore.saveToDisk(tableInfo, tablePath) @@ -322,6 +335,7 @@ object CarbonSource { } properties.foreach(e => map.put(e._1, e._2)) map.put("tablePath", tablePath) + map.put("dbname", dbName) map.asScala.toMap } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala index f5c6cba..197b23b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala @@ -39,9 +39,11 @@ case class CarbonCreateTableCommand( override def processSchema(sparkSession: SparkSession): Seq[Row] = { val storePath = CarbonEnv.getInstance(sparkSession).storePath CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) + checkSchemasModifiedTimeAndReloadTables() val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) + val dbLocation = GetDB.getDatabaseLocation(cm.databaseName, sparkSession, storePath) + val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + cm.tableName val tbName = cm.tableName val dbName = cm.databaseName LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") @@ -70,11 +72,10 @@ case class CarbonCreateTableCommand( sys.error(s"Table [$tbName] already exists under database [$dbName]") } } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) + val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName) // Add Database to catalog and persist val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) + val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier) if (createDSTable) { try { val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) @@ -89,10 +90,9 @@ case class CarbonCreateTableCommand( s""""$tablePath"$carbonSchemaString) """) } catch { case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) // call the drop table to delete the created table. CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) + .dropTable(tableIdentifier)(sparkSession) LOGGER.audit(s"Table creation with Database name [$dbName] " + s"and Table name [$tbName] failed") http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala index 1bf17b3..0343393 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} @@ -53,14 +54,16 @@ case class CarbonDropTableCommand( val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetastore - val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, - dbName.toLowerCase, tableName.toLowerCase) - catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) + val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase + val absoluteTableIdentifier = AbsoluteTableIdentifier + .from(tablePath, dbName.toLowerCase, tableName.toLowerCase) + catalog.checkSchemasModifiedTimeAndReloadTables() val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) + lock => carbonLocks += CarbonLockUtil.getLockObject(absoluteTableIdentifier, lock) } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") val carbonTable: Option[CarbonTable] = @@ -98,7 +101,7 @@ case class CarbonDropTableCommand( sparkSession) OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext) CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession) + .dropTable(absoluteTableIdentifier)(sparkSession) // fires the event after dropping main table val dropTablePostEvent: DropTablePostEvent = @@ -127,8 +130,10 @@ case class CarbonDropTableCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession) - val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName) + val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase + val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath val fileType = FileFactory.getFileType(metadataFilePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index dc3b1ae..66f2756 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} @@ -63,14 +64,16 @@ case class CarbonDropDataMapCommand( val locksToBeAcquired = List(LockUsage.METADATA_LOCK) val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetastore + val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, - dbName.toLowerCase, tableName.toLowerCase) - catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) + AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase) + catalog.checkSchemasModifiedTimeAndReloadTables() val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) + lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) } LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]") val carbonTable: Option[CarbonTable] = @@ -140,8 +143,10 @@ case class CarbonDropDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession) - val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName) + val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase + val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName) Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala index f87e734..947cea1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala @@ -71,7 +71,7 @@ case class AlterTableCompactionCommand( carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) + carbonLoadModel.setTablePath(relation.tableMeta.carbonTable.getTablePath) var storeLocation = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, @@ -131,7 +131,7 @@ case class AlterTableCompactionCommand( // Just launch job to merge index and return CommonUtil.mergeIndexFiles(sqlContext.sparkContext, carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName), - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, carbonTable) return } @@ -172,7 +172,7 @@ case class AlterTableCompactionCommand( } else { // normal flow of compaction val lock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK ) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala index 1b16b88..8b0dab7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala @@ -36,10 +36,14 @@ case class CleanFilesCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) if (forceTableClean) { + val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession) + val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession, + CarbonEnv.getInstance(sparkSession).storePath) + // TODO: TAABLEPATH CarbonStore.cleanFiles( - GetDB.getDatabaseName(databaseNameOp, sparkSession), + dbName, tableName, - CarbonEnv.getInstance(sparkSession).storePath, + databaseLocation, null, forceTableClean) } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala index 3853b5f..777c169 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala @@ -192,7 +192,7 @@ case class LoadTableCommand( } finally { // Once the data load is successful delete the unwanted partition files try { - val partitionLocation = table.getStorePath + "/partition/" + + val partitionLocation = relation.tableMeta.storePath + "/partition/" + table.getDatabaseName + "/" + table.getFactTableName + "/" val fileType = FileFactory.getFileType(partitionLocation) @@ -231,7 +231,7 @@ case class LoadTableCommand( val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier) + .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier) val dictFolderPath = carbonTablePath.getMetadataDirectoryPath val dimensions = carbonTable.getDimensionByTableName( carbonTable.getFactTableName).asScala.toArray @@ -245,7 +245,7 @@ case class LoadTableCommand( dimensions, carbonLoadModel, sparkSession.sqlContext, - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, dictFolderPath) } if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) { @@ -253,7 +253,7 @@ case class LoadTableCommand( GlobalDictionaryUtil .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, carbonTableIdentifier, dictFolderPath, dimensions, @@ -289,7 +289,7 @@ case class LoadTableCommand( } CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, columnar, partitionStatus, server, @@ -332,11 +332,11 @@ case class LoadTableCommand( GlobalDictionaryUtil.generateGlobalDictionary( sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, dictionaryDataFrame) CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getStorePath, + carbonLoadModel.getTablePath, columnar, partitionStatus, None, @@ -351,8 +351,7 @@ case class LoadTableCommand( model: DictionaryLoadModel, noDictDimension: Array[CarbonDimension]): Unit = { val sparkSession = sqlContext.sparkSession - val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, - model.table) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore // read TableInfo @@ -374,11 +373,12 @@ case class LoadTableCommand( tableInfo, entry, carbonTablePath.getPath)(sparkSession) // update the schema modified time - metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation) + metastore.updateAndTouchSchemasUpdatedTime() + val identifier = model.table.getCarbonTableIdentifier // update CarbonDataLoadSchema - val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName), - model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta + val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName), + identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 0c39dd4..a52008a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -76,14 +76,10 @@ object DeleteExecution { .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession). asInstanceOf[CarbonRelation] - val storeLocation = relation.tableMeta.storePath - val absoluteTableIdentifier: AbsoluteTableIdentifier = new - AbsoluteTableIdentifier(storeLocation, - relation.tableMeta.carbonTableIdentifier) - val tablePath = CarbonStorePath.getCarbonTablePath( - storeLocation, - absoluteTableIdentifier.getCarbonTableIdentifier) - val factPath = tablePath.getFactDir + val absoluteTableIdentifier = relation.tableMeta.carbonTable.getAbsoluteTableIdentifier + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier) + val factPath = carbonTablePath.getFactDir val carbonTable = relation.tableMeta.carbonTable var deleteStatus = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala index 764deb7..a898822 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala @@ -60,7 +60,7 @@ private[sql] case class ProjectForDeleteCommand( OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext) val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.METADATA_LOCK) var lockStatus = false try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala index e48693b..549c58f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala @@ -67,7 +67,7 @@ private[sql] case class ProjectForUpdateCommand( OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext) val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.METADATA_LOCK) var lockStatus = false // get the current time stamp which should be same for delete and update. @@ -83,9 +83,7 @@ private[sql] case class ProjectForUpdateCommand( else { throw new Exception("Table is locked for updation. Please try after some time") } - val tablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getStorePath, - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // Get RDD. dataSet = if (isPersistEnabled) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala index 9b16060..acd9bd3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala @@ -66,8 +66,8 @@ case class AlterTableDropCarbonPartitionCommand( val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier - val storePath = relation.tableMeta.storePath - carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) + val tablePath = relation.tableMeta.tablePath + carbonMetaStore.checkSchemasModifiedTimeAndReloadTables() if (relation == null) { sys.error(s"Table $dbName.$tableName does not exist") } @@ -101,14 +101,14 @@ case class AlterTableDropCarbonPartitionCommand( sys.error(s"Dropping range interval partition isn't support yet!") } partitionInfo.dropPartition(partitionIndex) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath // read TableInfo val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, - dbName, tableName, storePath) + dbName, tableName, tablePath) val tableSchema = wrapperTableInfo.getFactTable tableSchema.setPartitionInfo(partitionInfo) wrapperTableInfo.setFactTable(tableSchema) @@ -118,10 +118,10 @@ case class AlterTableDropCarbonPartitionCommand( thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, storePath) + dbName, tableName, tablePath) CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) + carbonMetaStore.updateAndTouchSchemasUpdatedTime() // sparkSession.catalog.refreshTable(tableName) Seq.empty } @@ -152,7 +152,7 @@ case class AlterTableDropCarbonPartitionCommand( carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) + carbonLoadModel.setTablePath(relation.tableMeta.tablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) alterTableDropPartition( @@ -224,7 +224,7 @@ case class AlterTableDropCarbonPartitionCommand( for (thread <- threadArray) { thread.join() } - val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath, + val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) refresher.refreshSegments(validSegments.asJava) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala index c3a918c..0973226 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala @@ -70,11 +70,11 @@ case class AlterTableSplitCarbonPartitionCommand( val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier - val storePath = relation.tableMeta.storePath + val tablePath = relation.tableMeta.tablePath if (relation == null) { sys.error(s"Table $dbName.$tableName does not exist") } - carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) + carbonMetaStore.checkSchemasModifiedTimeAndReloadTables() if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") sys.error(s"Alter table failed. table not found: $dbName.$tableName") @@ -95,13 +95,13 @@ case class AlterTableSplitCarbonPartitionCommand( updatePartitionInfo(partitionInfo, partitionIds) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath // read TableInfo val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, - dbName, tableName, storePath) + dbName, tableName, tablePath) val tableSchema = wrapperTableInfo.getFactTable tableSchema.setPartitionInfo(partitionInfo) wrapperTableInfo.setFactTable(tableSchema) @@ -109,10 +109,10 @@ case class AlterTableSplitCarbonPartitionCommand( val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, storePath) + dbName, tableName, tablePath) CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) + carbonMetaStore.updateAndTouchSchemasUpdatedTime() sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName))) Seq.empty } @@ -153,14 +153,14 @@ case class AlterTableSplitCarbonPartitionCommand( val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] - val storePath = relation.tableMeta.storePath + val tablePath = relation.tableMeta.tablePath val table = relation.tableMeta.carbonTable val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier val dataLoadSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(storePath) + carbonLoadModel.setTablePath(tablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) alterTableSplitPartition( @@ -232,7 +232,7 @@ case class AlterTableSplitCarbonPartitionCommand( threadArray.foreach { thread => thread.join() } - val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath, + val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) refresher.refreshSegments(validSegments.asJava) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index d693061..3193310 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -332,15 +332,15 @@ object PreAggregateUtil { locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, - carbonTable.getStorePath) + carbonTable.getTablePath) numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size if (wrapperTableInfo.getDataMapSchemaList.asScala. exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) { @@ -399,7 +399,7 @@ object PreAggregateUtil { val acquiredLocks = ListBuffer[ICarbonLock]() try { locksToBeAcquired.foreach { lock => - acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock) + acquiredLocks += CarbonLockUtil.getLockObject(table.getAbsoluteTableIdentifier, lock) } acquiredLocks.toList } catch { @@ -439,13 +439,12 @@ object PreAggregateUtil { .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable carbonTable.getTableLastUpdatedTime - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) if (thriftTable.dataMapSchemas.size > numberOfChildSchema) { metastore - .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier, - thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + .revertTableSchemaForPreAggCreationFailure(carbonTable.getAbsoluteTableIdentifier, + thriftTable)(sparkSession) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 7cc43d2..2132131 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -65,26 +65,25 @@ private[sql] case class CarbonAlterTableAddColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, - carbonTable.getStorePath) + carbonTable.getTablePath) newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, dbName, wrapperTableInfo, carbonTablePath, carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath, sparkSession.sparkContext).process + sparkSession.sparkContext).process // generate dictionary files for the newly added columns new AlterTableAddColumnRDD(sparkSession.sparkContext, newCols, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() + carbonTable.getAbsoluteTableIdentifier).collect() timeStamp = System.currentTimeMillis val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(timeStamp) @@ -105,8 +104,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") new AlterTableDropColumnRDD(sparkSession.sparkContext, newCols, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() + carbonTable.getAbsoluteTableIdentifier).collect() AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession) } sys.error(s"Alter table add operation failed: ${e.getMessage}") http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index 023e061..e44899e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -74,8 +74,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( sys.error(s"Invalid Column: $columnName") } // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) // maintain the added column for schema evolution history var addColumnSchema: ColumnSchema = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 0b737bf..dae2d7b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -110,8 +110,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext) // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val tableInfo: org.apache.carbondata.format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) // maintain the deleted columns for schema evolution history @@ -138,8 +138,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand( // delete dictionary files for dictionary column and clear dictionary cache from memory new AlterTableDropColumnRDD(sparkSession.sparkContext, dictionaryColumns, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() + carbonTable.getAbsoluteTableIdentifier).collect() // event will be fired before dropping the columns val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent = http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index c2e5cf0..f1cce13 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -28,7 +28,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath @@ -85,7 +85,7 @@ private[sql] case class CarbonAlterTableRenameCommand( .asInstanceOf[CarbonRelation].tableMeta carbonTable = tableMeta.carbonTable // invalid data map for the old table, see CARBON-1690 - val oldTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath) + val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier) // get the latest carbon table and check for column existence val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier) @@ -106,6 +106,21 @@ private[sql] case class CarbonAlterTableRenameCommand( schemaEvolutionEntry.setTime_stamp(timeStamp) renameBadRecords(oldTableName, newTableName, oldDatabaseName) val fileType = FileFactory.getFileType(tableMetadataFile) + val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, + newTableName, carbonTable.getCarbonTableIdentifier.getTableId) + var newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, newTableIdentifier) + + metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + .runSqlHive( + s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + .runSqlHive( + s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + + s"('tableName'='$newTableName', " + + s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") + // changed the rename order to deal with situation when carbon table and hive table + // will point to the same tablePath if (FileFactory.isFileExist(tableMetadataFile, fileType)) { val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + @@ -115,23 +130,12 @@ private[sql] case class CarbonAlterTableRenameCommand( sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") } } - val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, - newTableName, carbonTable.getCarbonTableIdentifier.getTableId) - val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier, + newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier, carbonTable.getCarbonTableIdentifier, tableInfo, schemaEvolutionEntry, tableMeta.tablePath)(sparkSession) - metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( - s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( - s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + - s"('tableName'='$newTableName', " + - s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent( carbonTable, alterTableRenameModel, @@ -150,7 +154,7 @@ private[sql] case class CarbonAlterTableRenameCommand( AlterTableUtil .revertRenameTableChanges(oldTableIdentifier, newTableName, - carbonTable.getStorePath, + carbonTable.getTablePath, carbonTable.getCarbonTableIdentifier.getTableId, timeStamp)( sparkSession) @@ -167,7 +171,7 @@ private[sql] case class CarbonAlterTableRenameCommand( locksToBeAcquired, oldDatabaseName, newTableName, - carbonTable.getStorePath) + carbonTable.getTablePath) } } Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index c6ca950..c126b25 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -102,7 +102,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CarbonAliasDecoderRelation(), rdd, output, - CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath, table.carbonTable.getTableInfo.serialize()) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index e39ba73..ef2e0a5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} -import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** @@ -77,7 +76,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { _, child: LogicalPlan, overwrite, _) => ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite.enabled)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => - CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 0343402..6d80a26 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -23,11 +23,10 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory @@ -39,12 +38,11 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier} -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.format import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -192,11 +190,11 @@ class CarbonFileMetastore extends CarbonMetaStore { private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = { val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val tableName = identifier.getCarbonTableIdentifier.getTableName - val storePath = identifier.getStorePath + val tablePath = identifier.getTablePath val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), tableName.toLowerCase(), UUID.randomUUID().toString) val carbonTablePath = - CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) val tableMetadataFile = carbonTablePath.getSchemaFilePath val fileType = FileFactory.getFileType(tableMetadataFile) if (FileFactory.isFileExist(tableMetadataFile, fileType)) { @@ -204,16 +202,16 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) + .getCarbonTablePath(tablePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setTablePath(tablePath) wrapperTableInfo .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier, - identifier.getStorePath, + identifier.getTablePath, identifier.getTablePath, carbonTable) metadata.tablesMeta += tableMeta @@ -237,16 +235,19 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, tablePath: String) (sparkSession: SparkSession): String = { - val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, oldTableIdentifier) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } + val oldCarbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + val newAbsoluteTableIdentifier = new AbsoluteTableIdentifier(CarbonUtil + .getNewTablePath(oldCarbonTablePath, newTableIdentifier), newTableIdentifier) val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - absoluteTableIdentifier.getStorePath) + newAbsoluteTableIdentifier.getTablePath) val identifier = new CarbonTableIdentifier(newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, @@ -254,10 +255,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, identifier) - addTableCache(wrapperTableInfo, - AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath, - newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName)) + addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier) path } @@ -266,47 +264,44 @@ class CarbonFileMetastore extends CarbonMetaStore { * * @param carbonTableIdentifier * @param thriftTableInfo - * @param tablePath * @param sparkSession */ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - tablePath: String)(sparkSession: SparkSession): String = { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, - tableIdentifier.getStorePath) + absoluteTableIdentifier.getTablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath) val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - tableIdentifier.getCarbonTableIdentifier) - addTableCache(wrapperTableInfo, tableIdentifier) + absoluteTableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, absoluteTableIdentifier) path } - override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: - CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - tablePath: String)(sparkSession: SparkSession): String = { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: + AbsoluteTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo) + (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName, - tableIdentifier.getStorePath) + absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + absoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + absoluteTableIdentifier.getTablePath) val childSchemaList = wrapperTableInfo.getDataMapSchemaList childSchemaList.remove(childSchemaList.size() - 1) - wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath) val path = createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - tableIdentifier.getCarbonTableIdentifier) - addTableCache(wrapperTableInfo, tableIdentifier) + absoluteTableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, absoluteTableIdentifier) path } @@ -323,8 +318,8 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - tableInfo.setStorePath(identifier.getStorePath) + val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) + tableInfo.setTablePath(identifier.getTablePath) createSchemaThriftFile(tableInfo, thriftTableInfo, identifier.getCarbonTableIdentifier) @@ -335,19 +330,18 @@ class CarbonFileMetastore extends CarbonMetaStore { * Generates schema string from TableInfo */ override def generateTableSchemaString(tableInfo: schema.table.TableInfo, - tablePath: String): String = { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) + absoluteTableIdentifier: AbsoluteTableIdentifier): String = { + val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(tableIdentifier.getStorePath) + tableInfo.setTablePath(absoluteTableIdentifier.getTablePath) val schemaEvolutionEntry = new schema.SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - addTableCache(tableInfo, tableIdentifier) + addTableCache(tableInfo, absoluteTableIdentifier) CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") } @@ -362,7 +356,7 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftTableInfo: TableInfo, carbonTableIdentifier: CarbonTableIdentifier): String = { val carbonTablePath = CarbonStorePath. - getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier) + getCarbonTablePath(tableInfo.getTablePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) @@ -374,7 +368,7 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath)) + updateSchemasUpdatedTime(touchSchemaFileSystemTime()) carbonTablePath.getPath } @@ -384,7 +378,7 @@ class CarbonFileMetastore extends CarbonMetaStore { CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath, + val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getTablePath, absoluteTableIdentifier.getTablePath, CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)) metadata.tablesMeta += tableMeta @@ -424,16 +418,16 @@ class CarbonFileMetastore extends CarbonMetaStore { } def updateMetadataByThriftTable(schemaFilePath: String, - tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = { + tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = { tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis()) val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) wrapperTableInfo .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo.setTablePath(tablePath) updateMetadataByWrapperTable(wrapperTableInfo) } @@ -446,17 +440,17 @@ class CarbonFileMetastore extends CarbonMetaStore { FileFactory.isFileExist(tablePath, fileType) } catch { case e: Exception => - false + false } } - def dropTable(tablePath: String, tableIdentifier: TableIdentifier) + def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier) (sparkSession: SparkSession) { - val dbName = tableIdentifier.database.get - val tableName = tableIdentifier.table - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath + val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName + val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) + .getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -467,21 +461,28 @@ class CarbonFileMetastore extends CarbonMetaStore { if (FileFactory.isFileExist(metadataFilePath, fileType)) { // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + checkSchemasModifiedTimeAndReloadTables() removeTableFromMetadata(dbName, tableName) - - updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath)) + updateSchemasUpdatedTime(touchSchemaFileSystemTime()) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables + val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(identifier) + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) } } - private def getTimestampFileAndType(basePath: String) = { + private def getTimestampFileAndType() = { + var basePath = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, + CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT) + basePath = CarbonUtil.checkAndAppendFileSystemURIScheme(basePath) val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE val timestampFileType = FileFactory.getFileType(timestampFile) + if (!FileFactory.isFileExist(basePath, timestampFileType)) { + FileFactory.mkdirs(basePath, timestampFileType) + } (timestampFile, timestampFileType) } @@ -494,8 +495,8 @@ class CarbonFileMetastore extends CarbonMetaStore { tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, timeStamp) } - def updateAndTouchSchemasUpdatedTime(basePath: String) { - updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath)) + def updateAndTouchSchemasUpdatedTime() { + updateSchemasUpdatedTime(touchSchemaFileSystemTime()) } @@ -504,10 +505,10 @@ class CarbonFileMetastore extends CarbonMetaStore { * * @return */ - private def touchSchemaFileSystemTime(basePath: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath) + private def touchSchemaFileSystemTime(): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType() if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $basePath") + LOGGER.audit(s"Creating timestamp file for $timestampFile") FileFactory.createNewFile(timestampFile, timestampFileType) } FileFactory.getCarbonFile(timestampFile, timestampFileType) @@ -518,9 +519,8 @@ class CarbonFileMetastore extends CarbonMetaStore { .getLastModifiedTime } - def checkSchemasModifiedTimeAndReloadTables(storePath: String) { - val (timestampFile, timestampFileType) = - getTimestampFileAndType(storePath) + def checkSchemasModifiedTimeAndReloadTables() { + val (timestampFile, timestampFileType) = getTimestampFileAndType() if (FileFactory.isFileExist(timestampFile, timestampFileType)) { if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). getLastModifiedTime == http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index a500f00..dedaf1c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil @@ -33,7 +34,7 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.processing.merger.TableMeta -import org.apache.carbondata.spark.util.CarbonSparkUtil +import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil} /** * Metastore to store carbonschema in hive @@ -56,7 +57,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { if (info != null) { val table = CarbonTable.buildFromTableInfo(info) val meta = new TableMeta(table.getCarbonTableIdentifier, - absIdentifier.getStorePath, absIdentifier.getTablePath, table) + absIdentifier.getTablePath, absIdentifier.getTablePath, table) CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), meta) } else { @@ -70,25 +71,25 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { tableExists(tableIdentifier)(sparkSession) } - override def dropTable(tablePath: String, tableIdentifier: TableIdentifier) + override def dropTable(absoluteTableIdentifier: AbsoluteTableIdentifier) (sparkSession: SparkSession): Unit = { - val dbName = tableIdentifier.database.get - val tableName = tableIdentifier.table - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName + val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } - checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) + checkSchemasModifiedTimeAndReloadTables() removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables + val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance().clearDataMaps(identifier) + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) } - override def checkSchemasModifiedTimeAndReloadTables(storePath: String) { + override def checkSchemasModifiedTimeAndReloadTables() { // do nothing now } @@ -125,14 +126,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { tablePath: String) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } updateHiveMetaStoreForAlter(newTableIdentifier, oldTableIdentifier, thriftTableInfo, - identifier.getStorePath, + tablePath, sparkSession, schemaConverter) } @@ -142,19 +142,18 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { * * @param newTableIdentifier * @param thriftTableInfo - * @param carbonStorePath + * @param carbonTablePath * @param sparkSession */ override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String)(sparkSession: SparkSession): String = { + carbonTablePath: String)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath) updateHiveMetaStoreForDataMap(newTableIdentifier, oldTableIdentifier, thriftTableInfo, - identifier.getStorePath, + carbonTablePath, sparkSession, schemaConverter) } @@ -165,13 +164,14 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { carbonStorePath: String, sparkSession: SparkSession, schemaConverter: ThriftWrapperSchemaConverterImpl) = { + val tablePath = CarbonUtil.getNewTablePath(new Path(carbonStorePath), newTableIdentifier) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, newTableIdentifier) val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - carbonStorePath) - wrapperTableInfo.setStorePath(carbonStorePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier) + carbonTablePath.toString) + wrapperTableInfo.setTablePath(carbonStorePath) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) @@ -189,16 +189,17 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, - carbonStorePath: String, + tablePath: String, sparkSession: SparkSession, schemaConverter: ThriftWrapperSchemaConverterImpl) = { + val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier) val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - carbonStorePath) - wrapperTableInfo.setStorePath(carbonStorePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier) + newTablePath) + wrapperTableInfo.setTablePath(newTablePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(newTablePath, newTableIdentifier) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) @@ -207,7 +208,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath + carbonTablePath.getPath } /** @@ -219,32 +220,31 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { */ override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, - tablePath: String) + identifier: AbsoluteTableIdentifier) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) updateHiveMetaStoreForAlter(carbonTableIdentifier, carbonTableIdentifier, thriftTableInfo, - identifier.getStorePath, + identifier.getTablePath, sparkSession, schemaConverter) } - override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: - CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - tablePath: String)(sparkSession: SparkSession): String = { + override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: + AbsoluteTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo) + (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val childSchemas = thriftTableInfo.dataMapSchemas childSchemas.remove(childSchemas.size()) + val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier updateHiveMetaStoreForAlter(carbonTableIdentifier, carbonTableIdentifier, thriftTableInfo, - identifier.getStorePath, + absoluteTableIdentifier.getTablePath, sparkSession, schemaConverter) }