From commits-return-9655-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Sun Mar 4 13:33:09 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1BD441807D5 for ; Sun, 4 Mar 2018 13:33:03 +0100 (CET) Received: (qmail 24771 invoked by uid 500); 4 Mar 2018 12:33:03 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 24465 invoked by uid 99); 4 Mar 2018 12:33:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Mar 2018 12:33:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70C15F63DE; Sun, 4 Mar 2018 12:33:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Sun, 04 Mar 2018 12:33:19 -0000 Message-Id: <6e3c7e0627c54711873fb10d1e1979cc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/50] [abbrv] carbondata git commit: [CARBONDATA-2025] Unify all path construction through CarbonTablePath static method http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 40b5cfc..753e637 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException} @@ -97,8 +97,7 @@ private[sql] case class CarbonAlterTableRenameCommand( val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier) // get the latest carbon table and check for column existence - val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier) - val tableMetadataFile = oldTablePath.getPath + val tableMetadataFile = oldTableIdentifier.getTablePath val operationContext = new OperationContext // TODO: Pass new Table Path in pre-event. val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent( @@ -108,7 +107,7 @@ private[sql] case class CarbonAlterTableRenameCommand( sparkSession) OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext) val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(oldTablePath)(sparkSession) + metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setTableName(newTableName) timeStamp = System.currentTimeMillis() @@ -117,7 +116,8 @@ private[sql] case class CarbonAlterTableRenameCommand( val fileType = FileFactory.getFileType(tableMetadataFile) val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, newTableName, carbonTable.getCarbonTableIdentifier.getTableId) - var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) + var newTablePath = CarbonTablePath.getNewTablePath( + oldTableIdentifier.getTablePath, newTableIdentifier.getTableName) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .getClient() @@ -139,9 +139,9 @@ private[sql] case class CarbonAlterTableRenameCommand( // changed the rename order to deal with situation when carbon table and hive table // will point to the same tablePath if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType) - .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + - newTableName) + val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType) + .renameForce( + CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName)) if (!rename) { renameBadRecords(newTableName, oldTableName, oldDatabaseName) sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") @@ -149,7 +149,7 @@ private[sql] case class CarbonAlterTableRenameCommand( } val updatedParts = updatePartitionLocations( partitions, - oldTablePath.getPath, + oldTableIdentifier.getTablePath, newTablePath, sparkSession) @@ -191,13 +191,11 @@ private[sql] case class CarbonAlterTableRenameCommand( case e: Exception => LOGGER.error(e, "Rename table failed: " + e.getMessage) if (carbonTable != null) { - AlterTableUtil - .revertRenameTableChanges(oldTableIdentifier, - newTableName, - carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier.getTableId, - timeStamp)( - sparkSession) + AlterTableUtil.revertRenameTableChanges( + newTableName, + carbonTable, + timeStamp)( + sparkSession) renameBadRecords(newTableName, oldTableName, oldDatabaseName) } throwMetadataException(oldDatabaseName, oldTableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index b44dc7e..fd09e48 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -46,7 +46,8 @@ import org.apache.carbondata.core.metadata.schema import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -209,11 +210,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val tableName = identifier.getCarbonTableIdentifier.getTableName val tablePath = identifier.getTablePath - val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), - tableName.toLowerCase(), UUID.randomUUID().toString) - val carbonTablePath = - CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) val fileType = FileFactory.getFileType(tableMetadataFile) if (FileFactory.isFileExist(tableMetadataFile, fileType)) { val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) @@ -240,13 +237,13 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, tablePath: String) (sparkSession: SparkSession): String = { - val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier) + val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } - val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) + val newTablePath = CarbonTablePath.getNewTablePath( + identifier.getTablePath, newTableIdentifier.getTableName) val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, newTableIdentifier.getDatabaseName, @@ -341,8 +338,7 @@ class CarbonFileMetastore extends CarbonMetaStore { private def createSchemaThriftFile( identifier: AbsoluteTableIdentifier, thriftTableInfo: TableInfo): String = { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { @@ -356,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftWriter.write(thriftTableInfo) thriftWriter.close() updateSchemasUpdatedTime(touchSchemaFileSystemTime()) - carbonTablePath.getPath + identifier.getTablePath } protected def addTableCache( @@ -431,8 +427,7 @@ class CarbonFileMetastore extends CarbonMetaStore { (sparkSession: SparkSession) { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - .getMetadataDirectoryPath + val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath) val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -528,9 +523,9 @@ class CarbonFileMetastore extends CarbonMetaStore { override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = metadata.carbonTables - override def getThriftTableInfo(tablePath: CarbonTablePath) + override def getThriftTableInfo(carbonTable: CarbonTable) (sparkSession: SparkSession): TableInfo = { - val tableMetadataFile = tablePath.getSchemaFilePath + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath) CarbonUtil.readSchemaFile(tableMetadataFile) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 16ef38d..5e242b7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, Car import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -96,12 +96,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { Seq() } - override def getThriftTableInfo(tablePath: CarbonTablePath) + override def getThriftTableInfo(carbonTable: CarbonTable) (sparkSession: SparkSession): format.TableInfo = { - val identifier = tablePath.getCarbonTableIdentifier - val relation = lookupRelation(TableIdentifier(identifier.getTableName, - Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation] - val carbonTable = relation.metaData.carbonTable val schemaConverter = new ThriftWrapperSchemaConverterImpl schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, carbonTable.getDatabaseName, @@ -148,7 +144,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonTablePath: String)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - updateHiveMetaStoreForAlter(newTableIdentifier, + updateHiveMetaStoreForAlter( + newTableIdentifier, oldTableIdentifier, thriftTableInfo, carbonTablePath, @@ -163,7 +160,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { sparkSession: SparkSession, schemaConverter: ThriftWrapperSchemaConverterImpl) = { val newTablePath = - CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName) + CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, newTableIdentifier.getDatabaseName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 93c7c09..2c55e12 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -27,10 +27,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable import org.apache.carbondata.core.metadata.schema import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} - /** * Interface for Carbonmetastore */ @@ -143,7 +141,7 @@ trait CarbonMetaStore { def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] - def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo + def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index 7bf8536..e9e93d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath /** * Represents logical plan for one carbon table @@ -212,10 +212,7 @@ case class CarbonRelation( .getValidAndInvalidSegments.getValidSegments.isEmpty) { sizeInBytesLocalValue = 0L } else { - val carbonTablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val tablePath = carbonTablePath.getPath + val tablePath = carbonTable.getTablePath val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { // get the valid segments @@ -225,9 +222,9 @@ case class CarbonRelation( // for each segment calculate the size segments.foreach {validSeg => if (validSeg.getSegmentFileName != null) { - val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName) size = size + CarbonUtil.getSizeOfSegment( - carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) + carbonTable.getTablePath, + new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) } else { size = size + FileFactory.getDirectorySize( CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 8ebd5a9..bc36e9c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -176,41 +177,28 @@ object AlterTableUtil { /** * This method reverts the changes to the schema if the rename table command fails. - * - * @param oldTableIdentifier - * @param newTableName - * @param timeStamp - * @param sparkSession */ - def revertRenameTableChanges(oldTableIdentifier: TableIdentifier, + def revertRenameTableChanges( newTableName: String, - tablePath: String, - tableId: String, + oldCarbonTable: CarbonTable, timeStamp: Long) (sparkSession: SparkSession): Unit = { - val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val oldCarbonTableIdentifier = new CarbonTableIdentifier(database, - oldTableIdentifier.table, tableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier) + val tablePath = oldCarbonTable.getTablePath + val tableId = oldCarbonTable.getCarbonTableIdentifier.getTableId + val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier + val database = oldCarbonTable.getDatabaseName val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId) - val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName) + val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { - val tableInfo = if (metastore.isReadFromHiveMetaStore) { - // In case of hive metastore we first update the carbonschema inside old table only. - metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath, - new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession) - } else { - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - } + val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession) val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { - LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }") - FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) - .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + - oldTableIdentifier.table) + LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}") + FileFactory.getCarbonFile(tablePath, fileType) + .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName)) val absoluteTableIdentifier = AbsoluteTableIdentifier.from( newTablePath, newCarbonTableIdentifier) @@ -233,9 +221,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -260,9 +246,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -293,9 +277,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -344,9 +326,7 @@ object AlterTableUtil { carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) // get the latest carbon table // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index aadee81..0bdef8a 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -856,9 +856,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { } def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index e3678cd..1d41ddc 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.TableOptionConstant @@ -65,9 +65,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { carbonLoadModel.setCsvHeaderColumns( CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala index f9519f8..a465251 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.util.CarbonLoaderUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index e543893..7ca0b56 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.TableOptionConstant @@ -179,9 +179,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) carbonLoadModel.setMaxColumns("100") // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 4ae3737..dfffc8e 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -33,10 +33,14 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.ProcessMetaDataException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { @@ -197,7 +201,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("batch_table", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket @@ -205,7 +208,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread1.start() // use thread pool to catch the exception of sink thread val pool = Executors.newSingleThreadExecutor() - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier) val future = pool.submit(thread2) Thread.sleep(1000) thread1.interrupt() @@ -225,11 +228,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_file", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val csvDataDir = new File("target/csvdata").getCanonicalPath // streaming ingest 10 rows generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) - val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1, + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, identifier) thread.start() Thread.sleep(2000) @@ -1086,12 +1088,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_drop", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket val thread1 = createWriteSocketThread(server, 2, 10, 3) - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false) thread1.start() thread2.start() Thread.sleep(1000) @@ -1195,7 +1196,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createSocketStreamingThread( spark: SparkSession, port: Int, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, @@ -1216,7 +1217,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -1255,7 +1256,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier(tableName, Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket() @@ -1268,7 +1268,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val thread2 = createSocketStreamingThread( spark = spark, port = server.getLocalPort, - tablePath = tablePath, + carbonTable = carbonTable, tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, @@ -1316,7 +1316,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createFileStreamingThread( spark: SparkSession, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, csvDataDir: String, intervalSecond: Int, tableIdentifier: TableIdentifier): Thread = { @@ -1330,7 +1330,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"${ intervalSecond } seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index 3e3b2c5..064ff28 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.streaming.parser.CarbonStreamParser case class FileElement(school: Array[String], age: Integer) @@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { def createSocketStreamingThread( spark: SparkSession, port: Int, - tablePath: CarbonTablePath, + tablePath: String, tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, @@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath)) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier(tableName, Option("streaming1")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket() @@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { val thread2 = createSocketStreamingThread( spark = spark, port = server.getLocalPort, - tablePath = tablePath, + tablePath = carbonTable.getTablePath, tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 4d5f88c..27ed1bd 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -94,7 +94,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest") - assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6) + assert(new File(carbonTable.getMetadataPath).listFiles().length < 6) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index a8db6c9..bbc3697 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -56,43 +55,39 @@ public class TableProcessingOperations { */ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { - String metaDataLocation = carbonTable.getMetaDataFilepath(); + String metaDataLocation = carbonTable.getMetadataPath(); final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); //delete folder which metadata no exist in tablestatus - for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - String partitionPath = carbonTablePath.getPartitionDir(); - FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); - if (FileFactory.isFileExist(partitionPath, fileType)) { - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; - } - } - return !found; - } - }); - for (int k = 0; k < listFiles.length; k++) { + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); + if (FileFactory.isFileExist(partitionPath, fileType)) { + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); - if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; } } + return !found; + } + }); + for (int k = 0; k < listFiles.length; k++) { + String segmentId = + CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 4cd5014..193d192 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -105,12 +103,11 @@ public class FieldEncoderFactory { ColumnIdentifier parentColumnIdentifier = new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null, dataField.getColumn().getDataType()); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); AbsoluteTableIdentifier parentAbsoluteTableIdentifier = AbsoluteTableIdentifier.from( - CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()), - parentTableIdentifier); + CarbonTablePath.getNewTablePath( + absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()), + parentTableIdentifier); identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier, parentColumnIdentifier, dataField.getColumn().getDataType()); return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java index d3caa99..a08177a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger; import java.util.List; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; @@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor { public abstract boolean execute(List resultIteratorList); protected void setDataFileAttributesInModel(CarbonLoadModel loadModel, - CompactionType compactionType, CarbonTable carbonTable, - CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) { CarbonDataFileAttributes carbonDataFileAttributes; if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), - CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(), - carbonTable.getCarbonTableIdentifier())); + loadModel.getTablePath()); // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will // be written in same segment. So the TaskNo should be incremented by 1 from max val. long index = taskNo + 1; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 2a69f0d..a4d3d2b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -272,7 +272,7 @@ public class CarbonCompactionUtil { public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables, List skipList) { for (CarbonTable ctable : carbonTables) { - String metadataPath = ctable.getMetaDataFilepath(); + String metadataPath = ctable.getMetadataPath(); // check for the compaction required file and at the same time exclude the tables which are // present in the skip list. if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index c141636..4579c85 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -44,7 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -169,15 +168,13 @@ public final class CarbonDataMergerUtil { // End Timestamp. // Table Update Status Metadata Update. - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + new SegmentUpdateStatusManager(identifier); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); ICarbonLock statusLock = segmentStatusManager.getTableStatusLock(); @@ -224,7 +221,7 @@ public final class CarbonDataMergerUtil { } LoadMetadataDetails[] loadDetails = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadDetail : loadDetails) { if (loadsToMerge.contains(loadDetail)) { @@ -237,18 +234,18 @@ public final class CarbonDataMergerUtil { } } - segmentUpdateStatusManager - .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); - segmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); + segmentUpdateStatusManager.writeLoadDetailsIntoFile( + Arrays.asList(updateLists), timestamp); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails); status = true; } else { LOGGER.error("Not able to acquire the lock."); status = false; } } catch (IOException e) { - LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); + LOGGER.error("Error while updating metadata. The metadata file path is " + + CarbonTablePath.getMetadataPath(identifier.getTablePath())); status = false; } finally { @@ -284,9 +281,9 @@ public final class CarbonDataMergerUtil { String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, CompactionType compactionType, String segmentFile) throws IOException { boolean tableStatusUpdationStatus = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -295,10 +292,7 @@ public final class CarbonDataMergerUtil { LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "." + carbonLoadModel.getTableName() + " for table status updation "); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); - - String statusFilePath = carbonTablePath.getTableStatusFilePath(); + String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); @@ -617,12 +611,10 @@ public final class CarbonDataMergerUtil { // variable to store one segment size across partition. long sizeOfOneSegmentAcrossPartition; if (segment.getSegmentFile() != null) { - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); - sizeOfOneSegmentAcrossPartition = CarbonUtil - .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile())); + sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment( + carbonTable.getTablePath(), new Segment(segId, segment.getSegmentFile())); } else { - sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId); + sizeOfOneSegmentAcrossPartition = getSizeOfSegment(carbonTable.getTablePath(), segId); } // if size of a segment is greater than the Major compaction size. then ignore it. @@ -662,35 +654,17 @@ public final class CarbonDataMergerUtil { /** * For calculating the size of the specified segment * @param tablePath the store path of the segment - * @param tableIdentifier identifier of table that the segment belong to * @param segId segment id * @return the data size of the segment */ - private static long getSizeOfSegment(String tablePath, - CarbonTableIdentifier tableIdentifier, String segId) { - String loadPath = getStoreLocation(tablePath, tableIdentifier, segId); + private static long getSizeOfSegment(String tablePath, String segId) { + String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId); CarbonFile segmentFolder = FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); return getSizeOfFactFileInLoad(segmentFolder); } /** - * This method will get the store location for the given path, segemnt id and partition id - * - * @param tablePath - * @param carbonTableIdentifier identifier of catbon table that the segment belong to - * @param segmentId segment id - * @return the store location of the segment - */ - private static String getStoreLocation(String tablePath, - CarbonTableIdentifier carbonTableIdentifier, String segmentId) { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier); - return carbonTablePath.getCarbonDataDirectoryPath(segmentId); - } - - - /** * Identify the segments to be merged based on the segment count * * @param listOfSegmentsAfterPreserve the list of segments after @@ -1033,21 +1007,19 @@ public final class CarbonDataMergerUtil { * if UpdateDelta Files are more than IUD Compaction threshold. * * @param seg - * @param absoluteTableIdentifier + * @param identifier * @param segmentUpdateStatusManager * @param numberDeltaFilesThreshold * @return */ public static Boolean checkUpdateDeltaFilesInSeg(Segment seg, - AbsoluteTableIdentifier absoluteTableIdentifier, - SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { + AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager, + int numberDeltaFilesThreshold) { CarbonFile[] updateDeltaFiles = null; Set uniqueBlocks = new HashSet(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg.getSegmentNo()); + String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); @@ -1295,15 +1267,12 @@ public final class CarbonDataMergerUtil { CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true); // Update the Table Status. - String metaDataFilepath = table.getMetaDataFilepath(); - AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); + String metaDataFilepath = table.getMetadataPath(); + AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -1317,7 +1286,7 @@ public final class CarbonDataMergerUtil { + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { @@ -1326,7 +1295,7 @@ public final class CarbonDataMergerUtil { } } try { - segmentStatusManager + SegmentStatusManager .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); } catch (IOException e) { return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index a7b1711..324df2a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -413,8 +413,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, tempStoreLocation, carbonStoreLocation); - setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 569e026..4aca13a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -83,8 +83,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); - setDataFileAttributesInModel(loadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 732a7e8..5062a78 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -337,9 +336,8 @@ public class CarbonFactDataHandlerModel { return configuration.getDataWritePath(); } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String carbonDataDirectoryPath = carbonTablePath - .getCarbonDataDirectoryPath(configuration.getSegmentId() + ""); + String carbonDataDirectoryPath = CarbonTablePath + .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + ""); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 8a6cbe4..64e50b0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -44,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -148,9 +147,7 @@ public final class CarbonDataProcessorUtil { for (int i = 0 ; i < baseTmpStorePathArray.length; i++) { String tmpStore = baseTmpStorePathArray[i]; - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier()); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId); localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId; } @@ -399,14 +396,10 @@ public final class CarbonDataProcessorUtil { public static String createCarbonStoreLocation(String factStoreLocation, String databaseName, String tableName, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); - String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(segmentId); - return carbonDataDirectoryPath; + return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } + /** * initialise data type for measures for their storage format */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index d958937..276c3fe 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -46,7 +46,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -56,7 +55,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -76,11 +74,8 @@ public final class CarbonLoaderUtil { } public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + ""); + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); deleteStorePath(segmentPath); } @@ -93,33 +88,26 @@ public final class CarbonLoaderUtil { */ public static boolean isValidSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema() - .getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( - loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); int fileCount = 0; - int partitionCount = carbonTable.getPartitionCount(); - for (int i = 0; i < partitionCount; i++) { - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath( - currentLoad + ""); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, - FileFactory.getFileType(segmentPath)); - CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith( - CarbonTablePath.getCarbonIndexExtension()) - || file.getName().endsWith( - CarbonTablePath.getCarbonDataExtension()); - } - - }); - fileCount += files.length; - if (files.length > 0) { - return true; + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, + FileFactory.getFileType(segmentPath)); + CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return file.getName().endsWith( + CarbonTablePath.getCarbonIndexExtension()) + || file.getName().endsWith( + CarbonTablePath.getCarbonDataExtension()); } + + }); + fileCount += files.length; + if (files.length > 0) { + return true; } if (fileCount == 0) { return false; @@ -185,21 +173,21 @@ public final class CarbonLoaderUtil { CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid, List segmentsToBeDeleted, List segmentFilesTobeUpdated) throws IOException { boolean status = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String metadataPath = carbonTablePath.getMetadataDirectoryPath(); + String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath()); FileType fileType = FileFactory.getFileType(metadataPath); if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType); } String tableStatusPath; if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) { - tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid); + tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID( + identifier.getTablePath(), uuid); } else { - tableStatusPath = carbonTablePath.getTableStatusFilePath(); + tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); } - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); int retryCount = CarbonLockUtil .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, @@ -213,7 +201,8 @@ public final class CarbonLoaderUtil { "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName() + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); List listOfLoadFolderDetails = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List staleFolders = new ArrayList<>(); @@ -240,12 +229,12 @@ public final class CarbonLoaderUtil { for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && SegmentStatusManager.isLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert overwrite is in progress"); } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && SegmentStatusManager.isLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert into or load is in progress"); } } @@ -270,7 +259,7 @@ public final class CarbonLoaderUtil { entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); // For insert overwrite, we will delete the old segment folder immediately // So collect the old segments here - addToStaleFolders(carbonTablePath, staleFolders, entry); + addToStaleFolders(identifier, staleFolders, entry); } } } @@ -283,7 +272,7 @@ public final class CarbonLoaderUtil { // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE // so empty segment folder should be deleted if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) { - addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry); + addToStaleFolders(identifier, staleFolders, newMetaEntry); } for (LoadMetadataDetails detail: listOfLoadFolderDetails) { @@ -382,9 +371,10 @@ public final class CarbonLoaderUtil { return status; } - private static void addToStaleFolders(CarbonTablePath carbonTablePath, + private static void addToStaleFolders(AbsoluteTableIdentifier identifier, List staleFolders, LoadMetadataDetails entry) throws IOException { - String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName()); + String path = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), entry.getLoadName()); // add to the deletion list only if file exist else HDFS file system will throw // exception while deleting the file if file path does not exist if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { @@ -410,11 +400,9 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setLoadStartTime(loadStartTime); } - public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier, + public static void writeLoadMetadata(AbsoluteTableIdentifier identifier, List listOfLoadFolderDetails) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); + String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); DataOutputStream dataOutputStream; Gson gsonObjectToWrite = new Gson(); @@ -962,10 +950,8 @@ public final class CarbonLoaderUtil { * This method will get the store location for the given path, segment id and partition id */ public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) { - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier); - String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String segmentFolder = CarbonTablePath.getSegmentPath( + carbonTable.getTablePath(), segmentId); CarbonUtil.checkAndCreateFolder(segmentFolder); } @@ -994,9 +980,8 @@ public final class CarbonLoaderUtil { */ public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, String segmentId, CarbonTable carbonTable) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); - Map dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, + Map dataIndexSize = CarbonUtil.getDataSizeAndIndexSize( + carbonTable.getTablePath(), new Segment(segmentId, loadMetadataDetails.getSegmentFile())); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); loadMetadataDetails.setDataSize(String.valueOf(dataSize)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index 288cd54..c00cc86 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; public final class DeleteLoadFolders { @@ -50,15 +49,14 @@ public final class DeleteLoadFolders { /** * returns segment path * - * @param absoluteTableIdentifier + * @param identifier * @param oneLoad * @return */ - private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier, + private static String getSegmentPath(AbsoluteTableIdentifier identifier, LoadMetadataDetails oneLoad) { - CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String segmentId = oneLoad.getLoadName(); - return carbon.getCarbonDataDirectoryPath(segmentId); + return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); } public static void physicalFactAndMeasureMetadataDeletion(