carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [20/50] [abbrv] carbondata git commit: [CARBONDATA-2025] Unify all path construction through CarbonTablePath static method
Date Sun, 04 Mar 2018 12:33:19 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 40b5cfc..753e637 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
@@ -97,8 +97,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
-      val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
-      val tableMetadataFile = oldTablePath.getPath
+      val tableMetadataFile = oldTableIdentifier.getTablePath
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
       val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -108,7 +107,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(oldTablePath)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
@@ -117,7 +116,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+      var newTablePath = CarbonTablePath.getNewTablePath(
+        oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
         .getClient()
@@ -139,9 +139,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType)
-          .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       newTableName)
+        val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType)
+          .renameForce(
+            CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName))
         if (!rename) {
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
           sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
@@ -149,7 +149,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       }
       val updatedParts = updatePartitionLocations(
         partitions,
-        oldTablePath.getPath,
+        oldTableIdentifier.getTablePath,
         newTablePath,
         sparkSession)
 
@@ -191,13 +191,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       case e: Exception =>
         LOGGER.error(e, "Rename table failed: " + e.getMessage)
         if (carbonTable != null) {
-          AlterTableUtil
-            .revertRenameTableChanges(oldTableIdentifier,
-              newTableName,
-              carbonTable.getTablePath,
-              carbonTable.getCarbonTableIdentifier.getTableId,
-              timeStamp)(
-              sparkSession)
+          AlterTableUtil.revertRenameTableChanges(
+            newTableName,
+            carbonTable,
+            timeStamp)(
+            sparkSession)
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         }
         throwMetadataException(oldDatabaseName, oldTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b44dc7e..fd09e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -46,7 +46,8 @@ import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -209,11 +210,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
     val tablePath = identifier.getTablePath
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
-      tableName.toLowerCase(), UUID.randomUUID().toString)
-    val carbonTablePath =
-      CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
     val fileType = FileFactory.getFileType(tableMetadataFile)
     if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
       val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
@@ -240,13 +237,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+    val newTablePath = CarbonTablePath.getNewTablePath(
+      identifier.getTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,
@@ -341,8 +338,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
   private def createSchemaThriftFile(
       identifier: AbsoluteTableIdentifier,
       thriftTableInfo: TableInfo): String = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
@@ -356,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
     updateSchemasUpdatedTime(touchSchemaFileSystemTime())
-    carbonTablePath.getPath
+    identifier.getTablePath
   }
 
   protected def addTableCache(
@@ -431,8 +427,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     (sparkSession: SparkSession) {
     val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-      .getMetadataDirectoryPath
+    val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -528,9 +523,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
     metadata.carbonTables
 
-  override def getThriftTableInfo(tablePath: CarbonTablePath)
+  override def getThriftTableInfo(carbonTable: CarbonTable)
     (sparkSession: SparkSession): TableInfo = {
-    val tableMetadataFile = tablePath.getSchemaFilePath
+    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 16ef38d..5e242b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, Car
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -96,12 +96,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     Seq()
   }
 
-  override def getThriftTableInfo(tablePath: CarbonTablePath)
+  override def getThriftTableInfo(carbonTable: CarbonTable)
     (sparkSession: SparkSession): format.TableInfo = {
-    val identifier = tablePath.getCarbonTableIdentifier
-    val relation = lookupRelation(TableIdentifier(identifier.getTableName,
-      Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation]
-    val carbonTable = relation.metaData.carbonTable
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,
@@ -148,7 +144,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       carbonTablePath: String)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    updateHiveMetaStoreForAlter(newTableIdentifier,
+    updateHiveMetaStoreForAlter(
+      newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
       carbonTablePath,
@@ -163,7 +160,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
     val newTablePath =
-      CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
+      CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 93c7c09..2c55e12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,10 +27,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 
-
 /**
  * Interface for Carbonmetastore
  */
@@ -143,7 +141,7 @@ trait CarbonMetaStore {
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
-  def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
+  def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 7bf8536..e9e93d2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Represents logical plan for one carbon table
@@ -212,10 +212,7 @@ case class CarbonRelation(
         .getValidAndInvalidSegments.getValidSegments.isEmpty) {
         sizeInBytesLocalValue = 0L
       } else {
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(
-          carbonTable.getTablePath,
-          carbonTable.getCarbonTableIdentifier)
-        val tablePath = carbonTablePath.getPath
+        val tablePath = carbonTable.getTablePath
         val fileType = FileFactory.getFileType(tablePath)
         if (FileFactory.isFileExist(tablePath, fileType)) {
           // get the valid segments
@@ -225,9 +222,9 @@ case class CarbonRelation(
           // for each segment calculate the size
           segments.foreach {validSeg =>
             if (validSeg.getSegmentFileName != null) {
-              val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName)
               size = size + CarbonUtil.getSizeOfSegment(
-                carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
+                carbonTable.getTablePath,
+                new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
             } else {
               size = size + FileFactory.getDirectorySize(
                 CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8ebd5a9..bc36e9c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -176,41 +177,28 @@ object AlterTableUtil {
 
   /**
    * This method reverts the changes to the schema if the rename table command fails.
-   *
-   * @param oldTableIdentifier
-   * @param newTableName
-   * @param timeStamp
-   * @param sparkSession
    */
-  def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
+  def revertRenameTableChanges(
       newTableName: String,
-      tablePath: String,
-      tableId: String,
+      oldCarbonTable: CarbonTable,
       timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val oldCarbonTableIdentifier = new CarbonTableIdentifier(database,
-      oldTableIdentifier.table, tableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
+    val tablePath = oldCarbonTable.getTablePath
+    val tableId = oldCarbonTable.getCarbonTableIdentifier.getTableId
+    val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier
+    val database = oldCarbonTable.getDatabaseName
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName)
+    val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val fileType = FileFactory.getFileType(tablePath)
     if (FileFactory.isFileExist(tablePath, fileType)) {
-      val tableInfo = if (metastore.isReadFromHiveMetaStore) {
-        // In case of hive metastore we first update the carbonschema inside old table only.
-        metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
-          new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession)
-      } else {
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      }
+      val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
       val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
       if (updatedTime == timeStamp) {
-        LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
-        FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       oldTableIdentifier.table)
+        LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}")
+        FileFactory.getCarbonFile(tablePath, fileType)
+          .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName))
         val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
           newTablePath,
           newCarbonTableIdentifier)
@@ -233,9 +221,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -260,9 +246,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -293,9 +277,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-      carbonTable.getCarbonTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -344,9 +326,7 @@ object AlterTableUtil {
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath,
-        carbonTable.getCarbonTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index aadee81..0bdef8a 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -856,9 +856,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   }
 
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index e3678cd..1d41ddc 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
@@ -65,9 +65,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setCsvHeaderColumns(
       CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     // Create table and metadata folders if not exist
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
-    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index f9519f8..a465251 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index e543893..7ca0b56 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.TableOptionConstant
@@ -179,9 +179,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier)
-    val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+    val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 4ae3737..dfffc8e 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -33,10 +33,14 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
@@ -197,7 +201,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("batch_table", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
@@ -205,7 +208,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       thread1.start()
       // use thread pool to catch the exception of sink thread
       val pool = Executors.newSingleThreadExecutor()
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier)
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
@@ -225,11 +228,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val csvDataDir = new File("target/csvdata").getCanonicalPath
     // streaming ingest 10 rows
     generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
-    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
       identifier)
     thread.start()
     Thread.sleep(2000)
@@ -1086,12 +1088,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
       val thread1 = createWriteSocketThread(server, 2, 10, 3)
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false)
       thread1.start()
       thread2.start()
       Thread.sleep(1000)
@@ -1195,7 +1196,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -1216,7 +1217,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -1255,7 +1256,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -1268,7 +1268,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        carbonTable = carbonTable,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
@@ -1316,7 +1316,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
   def createFileStreamingThread(
       spark: SparkSession,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       csvDataDir: String,
       intervalSecond: Int,
       tableIdentifier: TableIdentifier): Thread = {
@@ -1330,7 +1330,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 3e3b2c5..064ff28 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Integer)
@@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      tablePath: String,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming1"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        tablePath = carbonTable.getTablePath,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 4d5f88c..27ed1bd 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -94,7 +94,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       }
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest")
 
-      assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
+      assert(new File(carbonTable.getMetadataPath).listFiles().length < 6)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index a8db6c9..bbc3697 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -56,43 +55,39 @@ public class TableProcessingOperations {
    */
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    String metaDataLocation = carbonTable.getMetadataPath();
     final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String partitionPath = carbonTablePath.getPartitionDir();
-      FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+    if (FileFactory.isFileExist(partitionPath, fileType)) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+          boolean found = false;
+          for (int j = 0; j < details.length; j++) {
+            if (details[j].getLoadName().equals(segmentId)) {
+              found = true;
+              break;
             }
           }
+          return !found;
+        }
+      });
+      for (int k = 0; k < listFiles.length; k++) {
+        String segmentId =
+            CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+        if (isCompactionFlow) {
+          if (segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
+        } else {
+          if (!segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4cd5014..193d192 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -105,12 +103,11 @@ public class FieldEncoderFactory {
           ColumnIdentifier parentColumnIdentifier =
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
                   dataField.getColumn().getDataType());
-          CarbonTablePath carbonTablePath =
-              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
           AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
               AbsoluteTableIdentifier.from(
-              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
-              parentTableIdentifier);
+                  CarbonTablePath.getNewTablePath(
+                      absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()),
+                  parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
           return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..a08177a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger;
 
 import java.util.List;
 
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor {
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonTable carbonTable,
-      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
       long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
-              carbonTable.getCarbonTableIdentifier()));
+          loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.
       long index = taskNo + 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 2a69f0d..a4d3d2b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -272,7 +272,7 @@ public class CarbonCompactionUtil {
   public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
       List<CarbonTableIdentifier> skipList) {
     for (CarbonTable ctable : carbonTables) {
-      String metadataPath = ctable.getMetaDataFilepath();
+      String metadataPath = ctable.getMetadataPath();
       // check for the compaction required file and at the same time exclude the tables which are
       // present in the skip list.
       if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c141636..4579c85 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -169,15 +168,13 @@ public final class CarbonDataMergerUtil {
     // End Timestamp.
 
     // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(identifier);
 
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
     ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -224,7 +221,7 @@ public final class CarbonDataMergerUtil {
           }
 
           LoadMetadataDetails[] loadDetails =
-              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+              SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
           for (LoadMetadataDetails loadDetail : loadDetails) {
             if (loadsToMerge.contains(loadDetail)) {
@@ -237,18 +234,18 @@ public final class CarbonDataMergerUtil {
             }
           }
 
-          segmentUpdateStatusManager
-              .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-          segmentStatusManager
-              .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+          segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+              Arrays.asList(updateLists), timestamp);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
           status = true;
         } else {
           LOGGER.error("Not able to acquire the lock.");
           status = false;
         }
       } catch (IOException e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
-            .getMetadataDirectoryPath());
+        LOGGER.error("Error while updating metadata. The metadata file path is " +
+            CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         status = false;
 
       } finally {
@@ -284,9 +281,9 @@ public final class CarbonDataMergerUtil {
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
       CompactionType compactionType, String segmentFile) throws IOException {
     boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -295,10 +292,7 @@ public final class CarbonDataMergerUtil {
         LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
             + carbonLoadModel.getTableName() + " for table status updation ");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+        String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
         LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
@@ -617,12 +611,10 @@ public final class CarbonDataMergerUtil {
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition;
       if (segment.getSegmentFile() != null) {
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
-        sizeOfOneSegmentAcrossPartition = CarbonUtil
-            .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
+        sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment(
+            carbonTable.getTablePath(), new Segment(segId, segment.getSegmentFile()));
       } else {
-        sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId);
+        sizeOfOneSegmentAcrossPartition = getSizeOfSegment(carbonTable.getTablePath(), segId);
       }
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
@@ -662,35 +654,17 @@ public final class CarbonDataMergerUtil {
   /**
    * For calculating the size of the specified segment
    * @param tablePath the store path of the segment
-   * @param tableIdentifier identifier of table that the segment belong to
    * @param segId segment id
    * @return the data size of the segment
    */
-  private static long getSizeOfSegment(String tablePath,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+  private static long getSizeOfSegment(String tablePath, String segId) {
+    String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
   }
 
   /**
-   * This method will get the store location for the given path, segemnt id and partition id
-   *
-   * @param tablePath
-   * @param carbonTableIdentifier identifier of catbon table that the segment belong to
-   * @param segmentId segment id
-   * @return the store location of the segment
-   */
-  private static String getStoreLocation(String tablePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-  }
-
-
-  /**
    * Identify the segments to be merged based on the segment count
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
@@ -1033,21 +1007,19 @@ public final class CarbonDataMergerUtil {
    * if UpdateDelta Files are more than IUD Compaction threshold.
    *
    * @param seg
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param segmentUpdateStatusManager
    * @param numberDeltaFilesThreshold
    * @return
    */
   public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+      AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager,
+      int numberDeltaFilesThreshold) {
 
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg.getSegmentNo());
+    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1295,15 +1267,12 @@ public final class CarbonDataMergerUtil {
     CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
 
     // Update the Table Status.
-    String metaDataFilepath = table.getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
+    String metaDataFilepath = table.getMetadataPath();
+    AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
 
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -1317,7 +1286,7 @@ public final class CarbonDataMergerUtil {
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+                SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1326,7 +1295,7 @@ public final class CarbonDataMergerUtil {
           }
         }
         try {
-          segmentStatusManager
+          SegmentStatusManager
                   .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index a7b1711..324df2a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -413,8 +413,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
             tempStoreLocation, carbonStoreLocation);
-    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 569e026..4aca13a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -83,8 +83,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
             tempStoreLocation, carbonStoreLocation);
-    setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 732a7e8..5062a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -337,9 +336,8 @@ public class CarbonFactDataHandlerModel {
       return configuration.getDataWritePath();
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String carbonDataDirectoryPath = carbonTablePath
-        .getCarbonDataDirectoryPath(configuration.getSegmentId() + "");
+    String carbonDataDirectoryPath = CarbonTablePath
+        .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + "");
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 8a6cbe4..64e50b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -148,9 +147,7 @@ public final class CarbonDataProcessorUtil {
 
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
-      CarbonTablePath carbonTablePath =
-          CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
-      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId);
 
       localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
     }
@@ -399,14 +396,10 @@ public final class CarbonDataProcessorUtil {
   public static String createCarbonStoreLocation(String factStoreLocation,
       String databaseName, String tableName, String segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-    return carbonDataDirectoryPath;
+    return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
   }
 
+
   /**
    * initialise data type for measures for their storage format
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index d958937..276c3fe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -56,7 +55,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -76,11 +74,8 @@ public final class CarbonLoaderUtil {
   }
 
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
     deleteStorePath(segmentPath);
   }
 
@@ -93,33 +88,26 @@ public final class CarbonLoaderUtil {
    */
   public static boolean isValidSegment(CarbonLoadModel loadModel,
       int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
-        .getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     int fileCount = 0;
-    int partitionCount = carbonTable.getPartitionCount();
-    for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
-          currentLoad + "");
-      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
-          FileFactory.getFileType(segmentPath));
-      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(
-              CarbonTablePath.getCarbonIndexExtension())
-              || file.getName().endsWith(
-              CarbonTablePath.getCarbonDataExtension());
-        }
-
-      });
-      fileCount += files.length;
-      if (files.length > 0) {
-        return true;
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+        FileFactory.getFileType(segmentPath));
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override
+      public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(
+            CarbonTablePath.getCarbonIndexExtension())
+            || file.getName().endsWith(
+            CarbonTablePath.getCarbonDataExtension());
       }
+
+    });
+    fileCount += files.length;
+    if (files.length > 0) {
+      return true;
     }
     if (fileCount == 0) {
       return false;
@@ -185,21 +173,21 @@ public final class CarbonLoaderUtil {
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
       List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
     boolean status = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
     FileType fileType = FileFactory.getFileType(metadataPath);
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType);
     }
     String tableStatusPath;
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
-      tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+      tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+          identifier.getTablePath(), uuid);
     } else {
-      tableStatusPath = carbonTablePath.getTableStatusFilePath();
+      tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
     }
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil
         .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -213,7 +201,8 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -240,12 +229,12 @@ public final class CarbonLoaderUtil {
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in progress");
             } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in progress");
             }
           }
@@ -270,7 +259,7 @@ public final class CarbonLoaderUtil {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
                 // For insert overwrite, we will delete the old segment folder immediately
                 // So collect the old segments here
-                addToStaleFolders(carbonTablePath, staleFolders, entry);
+                addToStaleFolders(identifier, staleFolders, entry);
               }
             }
           }
@@ -283,7 +272,7 @@ public final class CarbonLoaderUtil {
         // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
         // so empty segment folder should be deleted
         if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
-          addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+          addToStaleFolders(identifier, staleFolders, newMetaEntry);
         }
 
         for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
@@ -382,9 +371,10 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+  private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
-    String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+    String path = CarbonTablePath.getSegmentPath(
+        identifier.getTablePath(), entry.getLoadName());
     // add to the deletion list only if file exist else HDFS file system will throw
     // exception while deleting the file if file path does not exist
     if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -410,11 +400,9 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
+  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
     DataOutputStream dataOutputStream;
     Gson gsonObjectToWrite = new Gson();
@@ -962,10 +950,8 @@ public final class CarbonLoaderUtil {
    * This method will get the store location for the given path, segment id and partition id
    */
   public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
-    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+    String segmentFolder = CarbonTablePath.getSegmentPath(
+        carbonTable.getTablePath(), segmentId);
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
@@ -994,9 +980,8 @@ public final class CarbonLoaderUtil {
    */
   public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
-    Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath,
+    Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(
+        carbonTable.getTablePath(),
         new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 288cd54..c00cc86 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public final class DeleteLoadFolders {
@@ -50,15 +49,14 @@ public final class DeleteLoadFolders {
   /**
    * returns segment path
    *
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param oneLoad
    * @return
    */
-  private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
       LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath(segmentId);
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(


Mime
View raw message