carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [2/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support when creating table
Date Mon, 04 Dec 2017 06:36:12 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
index ab77fba..b305fa9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -27,7 +27,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePreEvent(carbonTable: Option[CarbonTable],
+case class DropTablePreEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -39,7 +40,8 @@ case class DropTablePreEvent(carbonTable: Option[CarbonTable],
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePostEvent(carbonTable: Option[CarbonTable],
+case class DropTablePostEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -51,7 +53,8 @@ case class DropTablePostEvent(carbonTable: Option[CarbonTable],
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTableAbortEvent(carbonTable: Option[CarbonTable],
+case class DropTableAbortEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 9c542b6..6279fca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.events
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
 
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
@@ -35,7 +35,7 @@ trait DatabaseEventInfo {
  * event for table related operations
  */
 trait TableEventInfo {
-  val carbonTableIdentifier: CarbonTableIdentifier
+  val identifier: AbsoluteTableIdentifier
 }
 
 /**
@@ -57,7 +57,7 @@ trait LookupRelationEventInfo {
  * event for drop table
  */
 trait DropTableEventInfo {
-  val carbonTable: Option[CarbonTable]
+  val carbonTable: CarbonTable
   val ifExistsSet: Boolean
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 4500221..594ea0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -17,21 +17,17 @@
 
 package org.apache.carbondata.spark
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 /**
  * Contains all options for Spark data source
  */
 class CarbonOption(options: Map[String, String]) {
-  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
 
-  def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+  def dbName: Option[String] = options.get("dbName")
 
   def tableName: String = options.getOrElse("tableName", "default_table")
 
-  def tablePath: String = s"$dbName/$tableName"
-
-  def tableId: String = options.getOrElse("tableId", "default_table_id")
+  def tablePath: Option[String] = options.get("tablePath")
 
   def partitionCount: String = options.getOrElse("partitionCount", "1")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 3f47a47..44fc7ad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -505,7 +505,6 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
  * @param sparkContext    spark context
  * @param table           carbon table identifier
  * @param dimensions      carbon dimenisons having predefined dict
- * @param hdfsLocation    carbon base store path
  * @param dictFolderPath  path of dictionary folder
  */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
@@ -513,7 +512,6 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     sparkContext: SparkContext,
     table: CarbonTableIdentifier,
     dimensions: Array[CarbonDimension],
-    hdfsLocation: String,
     dictFolderPath: String)
   extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 50005c8..2f190b5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -53,8 +53,8 @@ class CarbonIUDMergerRDD[K, V](
 
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 997838c..82b2a57 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -67,7 +67,7 @@ class CarbonMergerRDD[K, V](
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   var storeLocation: String = null
   var mergeResult: String = null
-  val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
+  val tablePath = carbonMergerMapping.hdfsStoreLocation
   val metadataFilePath = carbonMergerMapping.metadataFilePath
   val mergedLoadName = carbonMergerMapping.mergedLoadName
   val databaseName = carbonMergerMapping.databaseName
@@ -167,7 +167,7 @@ class CarbonMergerRDD[K, V](
         val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
           CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
 
-        carbonLoadModel.setTablePath(hdfsStoreLocation)
+        carbonLoadModel.setTablePath(tablePath)
         // check for restructured block
         // TODO: only in case of add and drop this variable should be true
         val restructuredBlockExists: Boolean = CarbonCompactionUtil
@@ -260,8 +260,8 @@ class CarbonMergerRDD[K, V](
 
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       absoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 11e5baf..7316574 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -330,9 +330,7 @@ class CarbonScanRDD(
   private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
-    CarbonTableInputFormat
-      .setTableName(conf,
-        tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+    CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
     createInputFormat(conf)
   }
 
@@ -341,9 +339,7 @@ class CarbonScanRDD(
     val tableInfo1 = getTableInfo
     CarbonTableInputFormat.setTableInfo(conf, tableInfo1)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
-    CarbonTableInputFormat
-      .setTableName(conf,
-        tableInfo1.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+    CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
     CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
     createInputFormat(conf)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
index e304d84..08d635b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
 
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.util.DataTypeUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index af1d3f8..3595884 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -529,7 +529,7 @@ object CommonUtil {
     CarbonLoaderUtil.populateNewLoadMetaEntry(
       newLoadMetaEntry, status, model.getFactTimeStamp, false)
     val entryAdded: Boolean =
-      CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
+      CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
     if (!entryAdded) {
       sys.error(s"Failed to add entry in table status for " +
                 s"${ model.getDatabaseName }.${model.getTableName}")
@@ -550,7 +550,7 @@ object CommonUtil {
     val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
     CarbonLoaderUtil
       .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
-    val updationStatus = CarbonLoaderUtil.recordLoadMetadata(loadMetaEntry, model, false, false)
+    val updationStatus = CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false)
     if (!updationStatus) {
       sys
         .error(s"Failed to update failure entry in table status for ${

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index f6170e8..ccbc9f5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -292,14 +292,13 @@ object GlobalDictionaryUtil {
    * @param carbonLoadModel carbon load model
    * @param table           CarbonTableIdentifier
    * @param dimensions      column list
-   * @param hdfsLocation    store location in HDFS
-   * @param dictfolderPath  path of dictionary folder
+   * @param dictFolderPath  path of dictionary folder
    */
-  def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
+  def createDictionaryLoadModel(
+      carbonLoadModel: CarbonLoadModel,
       table: CarbonTableIdentifier,
       dimensions: Array[CarbonDimension],
-      hdfsLocation: String,
-      dictfolderPath: String,
+      dictFolderPath: String,
       forPreDefDict: Boolean): DictionaryLoadModel = {
     val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
     val isComplexes = new ArrayBuffer[Boolean]
@@ -312,7 +311,7 @@ object GlobalDictionaryUtil {
     }
     val primDimensions = primDimensionsBuffer.map { x => x }.toArray
     val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
-      getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+      getDictionaryDetail(dictFolderPath, primDimensions, table, carbonLoadModel.getTablePath)
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
@@ -327,11 +326,12 @@ object GlobalDictionaryUtil {
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
-    val absoluteTableIdentifier = new AbsoluteTableIdentifier(hdfsLocation, table)
-    DictionaryLoadModel(absoluteTableIdentifier,
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
+    DictionaryLoadModel(
+      absoluteTableIdentifier,
       dimensions,
-      hdfsLocation,
-      dictfolderPath,
+      carbonLoadModel.getTablePath,
+      dictFolderPath,
       dictFilePaths,
       dictFileExists,
       isComplexes.toArray,
@@ -505,7 +505,6 @@ object GlobalDictionaryUtil {
    * @param dimensions      dimension column
    * @param carbonLoadModel carbon load model
    * @param sqlContext      spark sql context
-   * @param hdfsLocation    store location on hdfs
    * @param dictFolderPath  generated global dict file path
    */
   def generatePredefinedColDictionary(colDictFilePath: String,
@@ -513,15 +512,14 @@ object GlobalDictionaryUtil {
       dimensions: Array[CarbonDimension],
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,
-      hdfsLocation: String,
       dictFolderPath: String): Unit = {
     // set pre defined dictionary column
     setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
     val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
-      hdfsLocation, dictFolderPath, forPreDefDict = true)
+      dictFolderPath, forPreDefDict = true)
     // new RDD to achieve distributed column dict generation
     val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
-      sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
+      sqlContext.sparkContext, table, dimensions, dictFolderPath)
       .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
     val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
     // check result status
@@ -681,15 +679,16 @@ object GlobalDictionaryUtil {
    * @param sqlContext      sql context
    * @param carbonLoadModel carbon load model
    */
-  def generateGlobalDictionary(sqlContext: SQLContext,
+  def generateGlobalDictionary(
+      sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      tablePath: String,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None): Unit = {
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
       // create dictionary folder if not exists
+      val tablePath = carbonLoadModel.getTablePath
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
       val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
       // columns which need to generate global dictionary file
@@ -709,7 +708,7 @@ object GlobalDictionaryUtil {
         if (colDictFilePath != null) {
           // generate predefined dictionary
           generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-            dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
+            dimensions, carbonLoadModel, sqlContext, dictfolderPath)
         }
         if (headers.length > df.columns.length) {
           val msg = "The number of columns in the file header do not match the " +
@@ -725,7 +724,7 @@ object GlobalDictionaryUtil {
           // select column to push down pruning
           df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
           val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-            requireDimension, tablePath, dictfolderPath, false)
+            requireDimension, dictfolderPath, false)
           // combine distinct value in a block and partition by column
           val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -737,9 +736,9 @@ object GlobalDictionaryUtil {
           LOGGER.info("No column found for generating global dictionary in source data files")
         }
       } else {
-        generateDictionaryFromDictionaryFiles(sqlContext,
+        generateDictionaryFromDictionaryFiles(
+          sqlContext,
           carbonLoadModel,
-          tablePath,
           carbonTableIdentifier,
           dictfolderPath,
           dimensions,
@@ -764,11 +763,11 @@ object GlobalDictionaryUtil {
     }
   }
 
-  def generateDictionaryFromDictionaryFiles(sqlContext: SQLContext,
+  def generateDictionaryFromDictionaryFiles(
+      sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       carbonTableIdentifier: CarbonTableIdentifier,
-      dictfolderPath: String,
+      dictFolderPath: String,
       dimensions: Array[CarbonDimension],
       allDictionaryPath: String): Unit = {
     LOGGER.info("Generate global dictionary from dictionary files!")
@@ -781,7 +780,7 @@ object GlobalDictionaryUtil {
       val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
       if (requireDimension.nonEmpty) {
         val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-          requireDimension, storePath, dictfolderPath, false)
+          requireDimension, dictFolderPath, false)
         // check if dictionary files contains bad record
         val accumulator = sqlContext.sparkContext.accumulator(0)
         // read local dictionary file, and group by key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 8048b66..4ad939c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -245,7 +245,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       tableName: String,
       fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
-      tableProperties: mutable.Map[String, String],
+      tableProperties: Map[String, String],
       bucketFields: Option[BucketFields],
       isAlterFlow: Boolean = false,
       tableComment: Option[String] = None): TableModel = {
@@ -276,10 +276,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     TableModel(
       ifNotExistPresent,
-      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
       tableName,
-      tableProperties,
+      tableProperties.toMap,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
       Option(sortKeyDims),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 844f6f7..44f577d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.Map
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
@@ -30,7 +29,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
@@ -48,7 +47,6 @@ import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 case class TableModel(
     ifNotExistsSet: Boolean,
-    var databaseName: String,
     databaseNameOp: Option[String],
     tableName: String,
     tableProperties: Map[String, String],
@@ -58,8 +56,7 @@ case class TableModel(
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
-    colProps: Option[util.Map[String,
-    util.List[ColumnProperty]]] = None,
+    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
     partitionInfo: Option[PartitionInfo],
     tableComment: Option[String] = None,
@@ -322,8 +319,14 @@ class AlterTableColumnSchemaGenerator(
 
 // TODO: move this to carbon store API
 object TableNewProcessor {
-  def apply(cm: TableModel): TableInfo = {
-    new TableNewProcessor(cm).process
+  def apply(
+      cm: TableModel,
+      identifier: AbsoluteTableIdentifier): TableInfo = {
+    new TableNewProcessor(
+      cm,
+      identifier.getDatabaseName,
+      identifier.getTableName,
+      identifier.getTablePath).process
   }
 
   def createColumnSchema(
@@ -356,7 +359,7 @@ object TableNewProcessor {
     }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(databaseName, columnSchema)
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
     columnSchema.setColumnar(true)
@@ -370,7 +373,7 @@ object TableNewProcessor {
   }
 }
 
-class TableNewProcessor(cm: TableModel) {
+class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, tablePath: String) {
 
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
@@ -420,8 +423,7 @@ class TableNewProcessor(cm: TableModel) {
     }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
-      columnSchema)
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
     columnSchema.setDimensionColumn(isDimensionCol)
@@ -529,7 +531,6 @@ class TableNewProcessor(cm: TableModel) {
         LOGGER.error(s"Duplicate column found with name: $name")
         LOGGER.audit(
           s"Validation failed for Create/Alter Table Operation " +
-          s"for ${ cm.databaseName }.${ cm.tableName }" +
           s"Duplicate column found with name: $name")
         CarbonException.analysisException(s"Duplicate dimensions found with name: $name")
       }
@@ -621,11 +622,12 @@ class TableNewProcessor(cm: TableModel) {
       partitionInfo.setColumnSchemaList(partitionCols)
       tableSchema.setPartitionInfo(partitionInfo)
     }
-    tableSchema.setTableName(cm.tableName)
+    tableSchema.setTableName(tableName)
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)
-    tableInfo.setDatabaseName(cm.databaseName)
-    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseName, cm.tableName))
+    tableInfo.setTablePath(tablePath)
+    tableInfo.setDatabaseName(dbName)
+    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     tableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 76b9e30..6864495 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -54,7 +54,8 @@ object CarbonReflectionUtils {
       .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
   }
 
-  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
+  def getUnresolvedRelation(
+      tableIdentifier: TableIdentifier,
       version: String,
       tableAlias: Option[String] = None): UnresolvedRelation = {
     val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 145068f..f1b9ecd 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -286,21 +286,17 @@ object CarbonDataRDDFactory {
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val operationContext = new OperationContext
-    // for handling of the segment Merging.
-
     LOGGER.audit(s"Data load request has been received for table" +
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     // Check if any load need to be deleted before loading new data
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
     // create new segment folder  in carbon store
     if (updateModel.isEmpty) {
-      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-        carbonLoadModel.getSegmentId, carbonTable)
+      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
     }
     var loadStatus = SegmentStatus.SUCCESS
     var errorMessage: String = "DataLoad failure"
@@ -769,7 +765,7 @@ object CarbonDataRDDFactory {
       true)
     CarbonUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
-    val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+    val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable)
     if (!done) {
       val errorMessage = "Dataload failed due to failure in table status updation."

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 4f2cf14..d433470 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -70,48 +70,6 @@ case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attr
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
-object GetDB {
-
-  def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
-    dbName.getOrElse(
-      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
-  }
-
-  /**
-   * The method returns the database location
-   * if carbon.storeLocation does  point to spark.sql.warehouse.dir then returns
-   * the database locationUri as database location else follows the old behaviour
-   * making database location from carbon fixed store and database name.
-   *
-   * @param dbName
-   * @param sparkSession
-   * @param fixedStorePath
-   * @return
-   */
-  def getDatabaseLocation(dbName: String, sparkSession: SparkSession,
-      fixedStorePath: String): String = {
-    var databaseLocation =
-      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
-        .locationUri.toString
-    // for default database and db ends with .db
-    // check whether the carbon store and hive store is same or different.
-    if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
-      val properties = CarbonProperties.getInstance()
-      val carbonStorePath = FileFactory
-        .getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
-      val hiveStorePath = FileFactory
-        .getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
-      // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
-      // format
-      if (!hiveStorePath.equals(carbonStorePath)) {
-        databaseLocation = fixedStorePath + CarbonCommonConstants.FILE_SEPARATOR + dbName
-      }
-    }
-
-    return FileFactory.getUpdatedFilePath(databaseLocation)
-  }
-}
-
 case class ProjectForUpdate(
     table: UnresolvedRelation,
     columns: List[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 71590dd..ca371e1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -63,9 +63,13 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
     val storePath = CarbonProperties.getStorePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession))
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(options.tableName)
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(System.nanoTime())
+      .toString
     writeToTempCSVFile(tempCSVFolder, options)
 
     val tempCSVPath = new Path(tempCSVFolder)
@@ -133,7 +137,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
   private def loadDataFrame(options: CarbonOption): Unit = {
     val header = dataFrame.columns.mkString(",")
     CarbonLoadDataCommand(
-      Some(options.dbName),
+      Some(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)),
       options.tableName,
       null,
       Seq(),
@@ -168,18 +172,21 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
       "TABLE_BLOCKSIZE" -> options.tableBlockSize
     ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
     s"""
-       | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+       | CREATE TABLE IF NOT EXISTS $dbName.${options.tableName}
        | (${ carbonSchema.mkString(", ") })
        | STORED BY 'carbondata'
        | ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
+       | ${ if (options.tablePath.nonEmpty) s"LOCATION '${options.tablePath.get}'" else ""}
      """.stripMargin
   }
 
   private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
     s"""
        | LOAD DATA INPATH '$csvFolder'
-       | INTO TABLE ${options.dbName}.${options.tableName}
+       | INTO TABLE $dbName.${options.tableName}
        | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
        | 'SINGLE_PASS' = '${options.singlePass}')
      """.stripMargin

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 31ff323..57233cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,9 +46,10 @@ case class CarbonDatasourceHadoopRelation(
   extends BaseRelation with InsertableRelation {
 
   var caseInsensitiveMap = parameters.map(f => (f._1.toLowerCase, f._2))
-  lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head,
-    caseInsensitiveMap.getOrElse("dbname", GetDB.getDatabaseName(None, sparkSession)),
-    caseInsensitiveMap.get("tablename").get)
+  lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+    paths.head,
+    CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession),
+    caseInsensitiveMap("tablename"))
   lazy val databaseName: String = carbonTable.getDatabaseName
   lazy val tableName: String = carbonTable.getTableName
   CarbonSession.updateSessionInfoToCurrentThread(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 1a336c4..771a235 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -504,9 +504,9 @@ class CarbonDecoderRDD(
   }
 
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
-    val absoluteTableIdentifiers = relations.map { relation =>
-      val tableInfo = getTableInfo
-      (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier)
+    val tableInfo = getTableInfo
+    val absoluteTableIdentifiers = relations.map { _ =>
+      (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier())
     }.toMap
 
     val cacheProvider: CacheProvider = CacheProvider.getInstance

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index c03b210..811442b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,18 +17,18 @@
 
 package org.apache.spark.sql
 
-import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog, CarbonSQLConf}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.hive._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -99,8 +99,7 @@ class CarbonEnv {
 
 object CarbonEnv {
 
-  val carbonEnvMap: Map[SparkSession, CarbonEnv] =
-    new ConcurrentHashMap[SparkSession, CarbonEnv]
+  val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv]
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
@@ -117,18 +116,24 @@ object CarbonEnv {
   }
 
   /**
-   * Return carbon table instance by looking up table in `sparkSession`
+   * Return carbon table instance from cache or by looking up table in `sparkSession`
    */
   def getCarbonTable(
       databaseNameOp: Option[String],
       tableName: String)
     (sparkSession: SparkSession): CarbonTable = {
-    CarbonEnv
-      .getInstance(sparkSession)
-      .carbonMetastore
-      .lookupRelation(databaseNameOp, tableName)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
+    val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
+    val catalog = getInstance(sparkSession).carbonMetastore
+    // refresh cache
+    catalog.checkSchemasModifiedTimeAndReloadTables()
+
+    // try to get it from catch, otherwise lookup in catalog
+    catalog.getTableFromMetadataCache(databaseName, tableName)
+      .getOrElse(
+        catalog
+          .lookupRelation(databaseNameOp, tableName)(sparkSession)
+          .asInstanceOf[CarbonRelation]
+          .carbonTable)
   }
 
   /**
@@ -137,12 +142,7 @@ object CarbonEnv {
   def getCarbonTable(
       tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): CarbonTable = {
-    CarbonEnv
-      .getInstance(sparkSession)
-      .carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
+    getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
   }
 
   /**
@@ -155,32 +155,62 @@ object CarbonEnv {
   }
 
   /**
-   * Return table path for specified table
+   * The method returns the database location
+   * if carbon.storeLocation does  point to spark.sql.warehouse.dir then returns
+   * the database locationUri as database location else follows the old behaviour
+   * making database location from carbon fixed store and database name.
+   * @return database location
+   */
+  def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
+    var databaseLocation =
+      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
+        .locationUri.toString
+    // for default database and db ends with .db
+    // check whether the carbon store and hive store is same or different.
+    if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
+      val properties = CarbonProperties.getInstance()
+      val carbonStorePath =
+        FileFactory.getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
+      val hiveStorePath =
+        FileFactory.getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
+      // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
+      // format
+      if (!hiveStorePath.equals(carbonStorePath)) {
+        databaseLocation = CarbonProperties.getStorePath +
+                           CarbonCommonConstants.FILE_SEPARATOR +
+                           dbName
+      }
+    }
+
+    FileFactory.getUpdatedFilePath(databaseLocation)
+  }
+
+  /**
+   * Return table path from carbon table. If table does not exist, construct it using
+   * database location and table name
    */
   def getTablePath(
       databaseNameOp: Option[String],
       tableName: String
   )(sparkSession: SparkSession): String = {
-    val dbLocation = GetDB.getDatabaseLocation(
-      databaseNameOp.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
-      sparkSession,
-      CarbonProperties.getStorePath)
-    dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      getCarbonTable(databaseNameOp, tableName)(sparkSession).getTablePath
+    } catch {
+      case _: NoSuchTableException =>
+        val dbName = getDatabaseName(databaseNameOp)(sparkSession)
+        val dbLocation = getDatabaseLocation(dbName, sparkSession)
+        dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    }
   }
 
-  /**
-   * Return metadata path for specified table
-   */
-  def getMetadataPath(
+  def getIdentifier(
       databaseNameOp: Option[String],
       tableName: String
-  )(sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+  )(sparkSession: SparkSession): AbsoluteTableIdentifier = {
+    AbsoluteTableIdentifier.from(
       getTablePath(databaseNameOp, tableName)(sparkSession),
       getDatabaseName(databaseNameOp)(sparkSession),
       tableName)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    CarbonTablePath.getFolderContainingFile(schemaFilePath)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b764132..7fb146c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
@@ -67,9 +66,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.STORE_LOCATION)
-        val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
+        val tablePath =
+          CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
     }
   }
@@ -88,16 +86,17 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
-    val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
+    val tablePath = new Path(
+      CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession))
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)
     val (doSave, doAppend) = (mode, isExists) match {
       case (SaveMode.ErrorIfExists, true) =>
-        CarbonException.analysisException(s"path $storePath already exists.")
+        CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
+        val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
         sqlContext.sparkSession
-          .sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
+          .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)
@@ -124,8 +123,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       dataSchema: StructType): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val dbName: String =
+      CarbonEnv.getDatabaseName(parameters.get("dbName"))(sqlContext.sparkSession)
     val tableOption: Option[String] = parameters.get("tableName")
     if (tableOption.isEmpty) {
       CarbonException.analysisException("Table creation failed. Table name is not specified")
@@ -154,27 +153,27 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   }
 
 
-  private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
-      dataSchema: StructType) = {
+  private def createTableIfNotExists(
+      sparkSession: SparkSession,
+      parameters: Map[String, String],
+      dataSchema: StructType): (String, Map[String, String]) = {
 
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val dbName: String = CarbonEnv.getDatabaseName(parameters.get("dbName"))(sparkSession)
     val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
 
     try {
-      if (parameters.contains("carbonSchemaPartsNo")) {
-        getPathForTable(sparkSession, dbName, tableName, parameters)
-      } else {
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession)
-        (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
-      }
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      (carbonTable.getTablePath, parameters)
     } catch {
-      case ex: NoSuchTableException =>
+      case _: NoSuchTableException =>
         val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        val updatedParams =
-          CarbonSource.updateAndCreateTable(dataSchema, sparkSession, metaStore, parameters)
-        getPathForTable(sparkSession, dbName, tableName, updatedParams)
+        val identifier = AbsoluteTableIdentifier.from(
+          CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
+          dbName,
+          tableName)
+        val updatedParams = CarbonSource.updateAndCreateTable(
+          identifier, dataSchema, sparkSession, metaStore, parameters)
+        (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
@@ -203,8 +202,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else if (!sparkSession.isInstanceOf[CarbonSession]) {
         (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters)
       } else {
-        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-        (carbonTable.getTablePath, parameters)
+        (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), parameters)
       }
     } catch {
       case ex: Exception =>
@@ -222,54 +220,44 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       outputMode: OutputMode): Sink = {
 
     // check "tablePath" option
-    val tablePathOption = parameters.get("tablePath")
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableOption: Option[String] = parameters.get("tableName")
-    if (tableOption.isEmpty) {
-      throw new CarbonStreamException("Table creation failed. Table name is not specified")
-    }
-    val tableName = tableOption.get.toLowerCase()
+    val options = new CarbonOption(parameters)
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
+    val tableName = options.tableName
     if (tableName.contains(" ")) {
       throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " +
                                       "space")
     }
-    if (tablePathOption.isDefined) {
-      val sparkSession = sqlContext.sparkSession
-      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-
-      if (!carbonTable.isStreamingTable) {
-        throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
-                                        s"${carbonTable.getTableName} is not a streaming table")
-      }
-
-      // create sink
-      StreamSinkFactory.createStreamTableSink(
-        sqlContext.sparkSession,
-        sqlContext.sparkSession.sessionState.newHadoopConf(),
-        carbonTable,
-        parameters)
-    } else {
-      throw new CarbonStreamException("Require tablePath option for the write stream")
+    val sparkSession = sqlContext.sparkSession
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+    if (!carbonTable.isStreamingTable) {
+      throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+                                      s"${carbonTable.getTableName} is not a streaming table")
     }
+
+    // create sink
+    StreamSinkFactory.createStreamTableSink(
+      sqlContext.sparkSession,
+      sqlContext.sparkSession.sessionState.newHadoopConf(),
+      carbonTable,
+      parameters)
   }
 
 }
 
 object CarbonSource {
 
-  def createTableInfoFromParams(parameters: Map[String, String],
+  def createTableInfoFromParams(
+      parameters: Map[String, String],
       dataSchema: StructType,
-      dbName: String,
-      tableName: String): TableModel = {
+      identifier: AbsoluteTableIdentifier): TableModel = {
     val sqlParser = new CarbonSpark2SqlParser
     val fields = sqlParser.getFields(dataSchema)
     val map = scala.collection.mutable.Map[String, String]()
     parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
     val options = new CarbonOption(parameters)
     val bucketFields = sqlParser.getBucketFields(map, fields, options)
-    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
-      tableName, fields, Nil, map, bucketFields)
+    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
+      identifier.getTableName, fields, Nil, map, bucketFields)
   }
 
   /**
@@ -278,13 +266,23 @@ object CarbonSource {
    * @param sparkSession
    * @return
    */
-  def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable,
+  def updateCatalogTableWithCarbonSchema(
+      tableDesc: CatalogTable,
       sparkSession: SparkSession): CatalogTable = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
     if (!properties.contains("carbonSchemaPartsNo")) {
-      val map = updateAndCreateTable(tableDesc.schema, sparkSession, metaStore, properties)
+      val tablePath = CarbonEnv.getTablePath(
+        tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
+      val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
+      val map = updateAndCreateTable(
+        identifier,
+        tableDesc.schema,
+        sparkSession,
+        metaStore,
+        properties)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map)
       tableDesc.copy(storage = updatedFormat)
@@ -292,7 +290,7 @@ object CarbonSource {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
       if (!metaStore.isReadFromHiveMetaStore) {
         // save to disk
-        metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+        metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore
         val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
         val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
@@ -303,36 +301,26 @@ object CarbonSource {
     }
   }
 
-  def updateAndCreateTable(dataSchema: StructType,
+  def updateAndCreateTable(
+      identifier: AbsoluteTableIdentifier,
+      dataSchema: StructType,
       sparkSession: SparkSession,
       metaStore: CarbonMetaStore,
       properties: Map[String, String]): Map[String, String] = {
-    val dbName: String = properties.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableName: String = properties.getOrElse("tableName", "").toLowerCase
-    val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
-    val tableInfo: TableInfo = TableNewProcessor(model)
-    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath)
-    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val model = createTableInfoFromParams(properties, dataSchema, identifier)
+    val tableInfo: TableInfo = TableNewProcessor(model, identifier)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.
-      getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
     val map = if (metaStore.isReadFromHiveMetaStore) {
-      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
-      val schemaMetadataPath =
-        CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-      tableInfo.setMetaDataFilepath(schemaMetadataPath)
-      tableInfo.setTablePath(tableIdentifier.getTablePath)
       CarbonUtil.convertToMultiStringMap(tableInfo)
     } else {
-      metaStore.saveToDisk(tableInfo, tablePath)
+      metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
     }
     properties.foreach(e => map.put(e._1, e._2))
-    map.put("tablePath", tablePath)
-    map.put("dbname", dbName)
+    map.put("tablepath", identifier.getTablePath)
+    map.put("dbname", identifier.getDatabaseName)
     map.asScala.toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 622bf0a..f90abb8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -60,7 +60,7 @@ case class CarbonCreateDataMapCommand(
     } else {
       val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
       dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
-      val dbName = GetDB.getDatabaseName(tableIdentifier.database, sparkSession)
+      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
       // upadting the parent table about dataschema
       PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, dataMapSchema, sparkSession)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 46c803d..9a71523 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.datamap
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
@@ -54,14 +54,12 @@ case class CarbonDropDataMapCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
     val tableIdentifier =
       AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
     catalog.checkSchemasModifiedTimeAndReloadTables()
@@ -136,11 +134,7 @@ case class CarbonDropDataMapCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
index 45f99fd..d37ca0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
@@ -32,9 +32,9 @@ object DataMapDropTablePostListener extends OperationEventListener {
     val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
     val carbonTable = dropPostEvent.carbonTable
     val sparkSession = dropPostEvent.sparkSession
-    if (carbonTable.isDefined && carbonTable.get.hasDataMapSchema) {
+    if (carbonTable.hasDataMapSchema) {
       // drop all child tables
-      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+      val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
       childSchemas.asScala
         .filter(_.getRelationIdentifier != null)
         .foreach { childSchema =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 0011395..eacfded 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataCommand, DataProcessOperation, RunnableCommand}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Clean data in table
@@ -65,9 +63,8 @@ case class CarbonCleanFilesCommand(
 
   private def deleteAllData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
     CarbonStore.cleanFiles(
       dbName,
       tableName,
@@ -85,16 +82,14 @@ case class CarbonCleanFilesCommand(
     OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
 
     CarbonStore.cleanFiles(
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       CarbonProperties.getStorePath,
       carbonTable,
       forceTableClean)
 
-    val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
+    val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession)
+    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
   }
 
   private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 06eb657..a2819cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByIdCommand(
 
     CarbonStore.deleteLoadById(
       loadIds,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       carbonTable
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dbfb030..490bb58 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByLoadDateCommand(
 
     CarbonStore.deleteLoadByDate(
       loadDate,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       carbonTable)
     val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9d21468..ff13299 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
@@ -59,11 +59,7 @@ case class CarbonLoadDataCommand(
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None,
     var tableInfoOp: Option[TableInfo] = None)
-  extends RunnableCommand with DataProcessOperation {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
+  extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -75,8 +71,6 @@ case class CarbonLoadDataCommand(
       }
     }
 
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
 
@@ -99,6 +93,7 @@ case class CarbonLoadDataCommand(
     // update the property with new value
     carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
 
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     try {
       val table = if (tableInfoOp.isDefined) {
@@ -218,8 +213,7 @@ case class CarbonLoadDataCommand(
                                   table.getTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory
-              .getCarbonFile(partitionLocation, fileType)
+            val file = FileFactory.getCarbonFile(partitionLocation, fileType)
             CarbonUtil.deleteFoldersAndFiles(file)
           }
         } catch {
@@ -267,7 +261,6 @@ case class CarbonLoadDataCommand(
         dimensions,
         carbonLoadModel,
         sparkSession.sqlContext,
-        carbonLoadModel.getTablePath,
         dictFolderPath)
     }
     if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
@@ -275,7 +268,6 @@ case class CarbonLoadDataCommand(
       GlobalDictionaryUtil
         .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
           carbonLoadModel,
-          carbonLoadModel.getTablePath,
           carbonTableIdentifier,
           dictFolderPath,
           dimensions,
@@ -334,10 +326,11 @@ case class CarbonLoadDataCommand(
       val getSegIdUDF = udf((tupleId: String) =>
         CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
       // getting all fields except tupleId field as it is not required in the value
-      var otherFields = fields.toSeq
-        .filter(field => !field.name
-          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-        .map(field => new Column(field.name))
+      var otherFields = fields.toSeq.filter { field =>
+        !field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+      }.map { field =>
+        new Column(field.name)
+      }
 
       // extract tupleId field which will be used as a key
       val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
@@ -356,10 +349,10 @@ case class CarbonLoadDataCommand(
     GlobalDictionaryUtil.generateGlobalDictionary(
       sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getTablePath,
       hadoopConf,
       dictionaryDataFrame)
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+    CarbonDataRDDFactory.loadCarbonData(
+      sparkSession.sqlContext,
       carbonLoadModel,
       carbonLoadModel.getTablePath,
       columnar,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index c6898b2..e0311cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.{StringType, TimestampType}
@@ -43,8 +43,6 @@ case class CarbonShowLoadsCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     CarbonStore.showSegments(
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
       limit,
       carbonTable.getMetaDataFilepath
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 3d65862..ecc48cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation
 import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -34,7 +33,8 @@ import org.apache.carbondata.processing.loading.FailureCauses
  */
 private[sql] case class CarbonProjectForDeleteCommand(
     plan: LogicalPlan,
-    identifier: Seq[String],
+    databaseNameOp: Option[String],
+    tableName: String,
     timestamp: String)
   extends DataCommand {
 
@@ -44,10 +44,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
     val dataFrame = Dataset.ofRows(sparkSession, plan)
     val dataRdd = dataFrame.rdd
 
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
     // trigger event for Delete from table
     val operationContext = new OperationContext
@@ -62,7 +59,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
     try {
       lockStatus = metadataLock.lockWithRetries()
       LOGGER.audit(s" Delete data request has been received " +
-                   s"for ${ relation.databaseName }.${ relation.tableName }.")
+                   s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
       if (lockStatus) {
         LOGGER.info("Successfully able to get the table metadata file lock")
       } else {
@@ -73,10 +70,16 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
-        isUpdateOperation = false, executorErrors)) {
+      if (DeleteExecution.deleteDeltaExecution(
+        databaseNameOp,
+        tableName,
+        sparkSession,
+        dataRdd,
+        timestamp,
+        isUpdateOperation = false,
+        executorErrors)) {
         // call IUD Compaction.
-        HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
+        HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
           isUpdateOperation = false)
 
         // trigger post event for Delete from table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 3aadec3..75008ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,11 +34,12 @@ import org.apache.carbondata.processing.loading.FailureCauses
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,
-    tableIdentifier: Seq[String])
+    databaseNameOp: Option[String],
+    tableName: String)
   extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(CarbonProjectForUpdateCommand.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val res = plan find {
       case relation: LogicalRelation if relation.relation
@@ -51,10 +51,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
     if (res.isEmpty) {
       return Seq.empty
     }
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
     // trigger event for Update table
     val operationContext = new OperationContext
@@ -94,22 +91,35 @@ private[sql] case class CarbonProjectForUpdateCommand(
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
       // do delete operation.
-      DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-        currentTime + "", isUpdateOperation = true, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
+      DeleteExecution.deleteDeltaExecution(
+        databaseNameOp,
+        tableName,
+        sparkSession,
+        dataSet.rdd,
+        currentTime + "",
+        isUpdateOperation = true,
+        executionErrors)
+
+      if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
       }
 
       // do update operation.
-      performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
+      performUpdate(dataSet,
+        databaseNameOp,
+        tableName,
+        plan,
+        sparkSession,
+        currentTime,
+        executionErrors)
+
+      if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
       }
 
       // Do IUD Compaction.
-      HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+      HorizontalCompaction.tryHorizontalCompaction(
+        sparkSession, carbonTable, isUpdateOperation = true)
 
       // trigger event for Update table
       val updateTablePostEvent: UpdateTablePostEvent =
@@ -150,21 +160,21 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
   private def performUpdate(
       dataFrame: Dataset[Row],
-      tableIdentifier: Seq[String],
+      databaseNameOp: Option[String],
+      tableName: String,
       plan: LogicalPlan,
       sparkSession: SparkSession,
       currentTime: Long,
       executorErrors: ExecutionErrors): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
-
-      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
-      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
-      (tableIdentifier.size > 1 &&
-       tableIdentifier(0) == dbName &&
-       tableIdentifier(1) == tableName) ||
-      (tableIdentifier(0) == tableName)
+      val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+      (databaseNameOp.isDefined &&
+       databaseNameOp.get == dbName &&
+       tableName == relation.identifier.getCarbonTableIdentifier.getTableName) ||
+      (tableName == relation.identifier.getCarbonTableIdentifier.getTableName)
     }
+
     def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
       var header = ""
       var found = false


Mime
View raw message