carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [3/7] carbondata git commit: [CARBONDATA-1284]Implement hive based schema storage in carbon
Date Thu, 27 Jul 2017 12:42:03 GMT
[CARBONDATA-1284]Implement hive based schema storage in carbon

This closes #1149


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/042a05a5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/042a05a5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/042a05a5

Branch: refs/heads/master
Commit: 042a05a58cec223086ad55ad48ca27e34c40d135
Parents: 92fe63c
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sat Jul 8 17:14:04 2017 +0530
Committer: Raghunandan S <carbondatacontributions@gmail.com>
Committed: Thu Jul 27 18:47:52 2017 +0800

----------------------------------------------------------------------
 .../execution/command/carbonTableSchema.scala   |  30 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 384 ++++++++++---------
 2 files changed, 225 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 46b58c5..1781477 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -233,10 +233,10 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true)
extends Ru
 
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tbName
-               |(${ fields.map(f => f.rawSchema).mkString(",") })
-               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-                s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin
+
-                s""""$tablePath" $carbonSchemaString) """)
+                |(${ fields.map(f => f.rawSchema).mkString(",") })
+                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin
+
+            s""""$tablePath"$carbonSchemaString) """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -268,8 +268,8 @@ case class DeleteLoadsById(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-        tableMeta.carbonTable
+      lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadById(
       loadids,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -293,8 +293,8 @@ case class DeleteLoadsByLoadDate(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-        tableMeta.carbonTable
+      lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -750,11 +750,11 @@ case class LoadTable(
             (dataFrame, dataFrame)
           }
 
-          GlobalDictionaryUtil.generateGlobalDictionary(
-                sparkSession.sqlContext,
-                carbonLoadModel,
-                relation.tableMeta.storePath,
-                dictionaryDataFrame)
+            GlobalDictionaryUtil.generateGlobalDictionary(
+              sparkSession.sqlContext,
+              carbonLoadModel,
+              relation.tableMeta.storePath,
+              dictionaryDataFrame)
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
@@ -847,8 +847,8 @@ case class ShowLoads(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-        tableMeta.carbonTable
+      lookupRelation(databaseNameOp,  tableName)(sparkSession).asInstanceOf[CarbonRelation].
+      tableMeta.carbonTable
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 2407054..048681c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,23 +22,22 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -63,7 +62,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore
{
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -78,7 +77,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
     System.nanoTime() + ""
   }
 
-  val metadata = MetaData(new ArrayBuffer[TableMeta]())
+  lazy val metadata = loadMetadata(storePath, nextQueryId)
 
 
   /**
@@ -91,22 +90,9 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
-    val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
-    val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
-    val tables = getTableFromMetadataCache(database, tableName)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableName,
-          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
-      case None =>
-        readCarbonSchema(absIdentifier) match {
-          case Some(meta) =>
-            CarbonRelation(database, tableName,
-              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
-          case None =>
-            throw new NoSuchTableException(database, tableName)
-        }
-    }
+    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
+      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
+      .asInstanceOf[CarbonRelation]
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
@@ -114,21 +100,20 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  override def lookupRelation(tableIdentifier: TableIdentifier)
+  def lookupRelation(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): LogicalPlan = {
+    checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase)
-    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match
{
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _,
_),
-      _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
+      sparkSession.catalog.currentDatabase
+    )
+    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableIdentifier.table,
+          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
+      case None =>
+        throw new NoSuchTableException(database, tableIdentifier.table)
     }
-    relation
   }
 
   /**
@@ -138,7 +123,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    * @param tableName
    * @return
    */
-  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] =
{
+  def getTableFromMetadata(database: String,
+      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
     metadata.tablesMeta
       .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -150,48 +136,99 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    tableExists(TableIdentifier(table, databaseOp))(sparkSession)
   }
 
-  override def tableExists(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    try {
-      lookupRelation(tableIdentifier)(sparkSession)
-    } catch {
-      case e: Exception =>
-        return false
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
= {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
+  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
+    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val statistic = new QueryStatistic()
+    // creating zookeeper instance once.
+    // if zookeeper is configured as carbon lock type.
+    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
+    if (null != zookeeperurl) {
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+    }
+    if (metadataPath == null) {
+      return null
+    }
+    // if no locktype is configured and store type is HDFS set HDFS lock as default
+    if (null == CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.LOCK_TYPE,
+          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
+        )
+      LOGGER.info("Default lock type HDFSLOCK is configured")
     }
-    true
+    val fileType = FileFactory.getFileType(metadataPath)
+    val metaDataBuffer = new ArrayBuffer[TableMeta]
+    fillMetaData(metadataPath, fileType, metaDataBuffer)
+    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+      System.currentTimeMillis())
+    recorder.recordStatisticsForDriver(statistic, queryId)
+    MetaData(metaDataBuffer)
   }
 
-  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] =
{
-    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
-    val tableName = identifier.getCarbonTableIdentifier.getTableName
-    val storePath = identifier.getStorePath
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
-      tableName.toLowerCase(), UUID.randomUUID().toString)
-    val carbonTablePath =
-      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val fileType = FileFactory.getFileType(tableMetadataFile)
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      val tableUniqueName = dbName + "_" + tableName
-      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
-      val schemaFilePath = CarbonStorePath
-        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-      wrapperTableInfo.setStorePath(storePath)
-      wrapperTableInfo
-        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
-        identifier.getStorePath,
-        identifier.getTablePath,
-        carbonTable)
-      metadata.tablesMeta += tableMeta
-      Some(tableMeta)
-    } else {
-      None
+  private def fillMetaData(basePath: String, fileType: FileType,
+      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+    val databasePath = basePath // + "/schemas"
+    try {
+      if (FileFactory.isFileExist(databasePath, fileType)) {
+        val file = FileFactory.getCarbonFile(databasePath, fileType)
+        val databaseFolders = file.listFiles()
+
+        databaseFolders.foreach(databaseFolder => {
+          if (databaseFolder.isDirectory) {
+            val dbName = databaseFolder.getName
+            val tableFolders = databaseFolder.listFiles()
+
+            tableFolders.foreach(tableFolder => {
+              if (tableFolder.isDirectory) {
+                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+                  tableFolder.getName, UUID.randomUUID().toString)
+                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+                  carbonTableIdentifier)
+                val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+                  val tableName = tableFolder.getName
+                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
+                  val wrapperTableInfo = schemaConverter
+                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+                  val schemaFilePath = CarbonStorePath
+                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+                  wrapperTableInfo.setStorePath(storePath)
+                  wrapperTableInfo
+                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+                    carbonTable)
+                }
+              }
+            })
+          }
+        })
+      } else {
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        s.printStackTrace()
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
     }
   }
 
@@ -201,15 +238,15 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param tablePath
+   * @param carbonStorePath
    * @param sparkSession
    */
   def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -218,19 +255,11 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
       .fromExternalToWrapperTableInfo(thriftTableInfo,
           newTableIdentifier.getDatabaseName,
           newTableIdentifier.getTableName,
-        absoluteTableIdentifier.getStorePath)
-    val identifier =
-      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        wrapperTableInfo.getFactTable.getTableId)
-    val path = createSchemaThriftFile(wrapperTableInfo,
+          carbonStorePath)
+    createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      identifier)
-    addTableCache(wrapperTableInfo,
-      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
       newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName))
-    path
+      newTableIdentifier.getTableName)(sparkSession)
   }
 
   /**
@@ -238,27 +267,25 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param tablePath
+   * @param carbonStorePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      tablePath: String)(sparkSession: SparkSession): String = {
-    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        tableIdentifier.getStorePath)
+        carbonStorePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
-    val path = createSchemaThriftFile(wrapperTableInfo,
+    createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      tableIdentifier.getCarbonTableIdentifier)
-    addTableCache(wrapperTableInfo, tableIdentifier)
-    path
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName)(sparkSession)
   }
 
 
@@ -269,38 +296,24 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    * Load CarbonTable from wrapper tableInfo
    *
    */
-  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
+  def createTableFromThrift(
+      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) =
{
+    if (tableExists(tableName, Some(dbName))(sparkSession)) {
+      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+    }
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val dbName = tableInfo.getDatabaseName
-    val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    tableInfo.setStorePath(identifier.getStorePath)
-    createSchemaThriftFile(tableInfo,
+    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+      .add(schemaEvolutionEntry)
+    val carbonTablePath = createSchemaThriftFile(tableInfo,
       thriftTableInfo,
-      identifier.getCarbonTableIdentifier)
+      dbName,
+      tableName)(sparkSession)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-  }
-
-  /**
-   * Generates schema string from TableInfo
-   */
-  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
-      tablePath: String): String = {
-    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(tableIdentifier.getStorePath)
-    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    addTableCache(tableInfo, tableIdentifier)
-    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
+    (carbonTablePath, "")
   }
 
   /**
@@ -308,16 +321,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    *
    * @param tableInfo
    * @param thriftTableInfo
+   * @param dbName
+   * @param tableName
+   * @param sparkSession
    * @return
    */
-  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
-      thriftTableInfo: TableInfo,
-      carbonTableIdentifier: CarbonTableIdentifier): String = {
-    val carbonTablePath = CarbonStorePath.
-      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(
+      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      dbName: String, tableName: String)
+    (sparkSession: SparkSession): String = {
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      tableInfo.getFactTable.getTableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(storePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -326,20 +346,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
-    carbonTablePath.getPath
-  }
-
-  protected def addTableCache(tableInfo: table.TableInfo,
-      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
-    val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
-    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
-    removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
+    removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
-      absoluteTableIdentifier.getTablePath,
-      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
     metadata.tablesMeta += tableMeta
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+    carbonTablePath.getPath
   }
 
   /**
@@ -349,15 +362,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
     metadataToBeRemoved match {
       case Some(tableMeta) =>
         metadata.tablesMeta -= tableMeta
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
-        if (LOGGER.isDebugEnabled) {
-          LOGGER.debug(s"No entry for table $tableName in database $dbName")
-        }
+        LOGGER.debug(s"No entry for table $tableName in database $dbName")
     }
   }
 
@@ -391,23 +402,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
 
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
= {
-    try {
-      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
-        asInstanceOf[CarbonRelation].tableMeta.tablePath
-      val fileType = FileFactory.getFileType(tablePath)
-      FileFactory.isFileExist(tablePath, fileType)
-    } catch {
-      case e: Exception =>
-        false
-    }
+    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = tableIdentifier.table.toLowerCase
+
+    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+    val fileType = FileFactory.getFileType(tablePath)
+    FileFactory.isFileExist(tablePath, fileType)
   }
 
-  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
+
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -418,18 +429,27 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
-
-      removeTableFromMetadata(dbName, tableName)
-      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
+      checkSchemasModifiedTimeAndReloadTables
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+        tableIdentifier.table)
+      metadataToBeRemoved match {
+        case Some(tableMeta) =>
+          metadata.tablesMeta -= tableMeta
+          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+        case None =>
+          LOGGER.info(s"Metadata does not contain entry for table $tableName in database
$dbName")
+      }
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(basePath: String) = {
-    val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
@@ -443,20 +463,37 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
     tableModifiedTimeStore.put("default", timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(basePath: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
+  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
   }
 
+  /**
+   * This method will read the timestamp of empty schema file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+    } else {
+      System.currentTimeMillis()
+    }
+  }
 
   /**
    * This method will check and create an empty schema timestamp file
    *
+   * @param databaseName
+   * @param tableName
    * @return
    */
-  private def touchSchemaFileSystemTime(basePath: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
+  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long =
{
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $basePath")
+      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     val systemTime = System.currentTimeMillis()
@@ -465,9 +502,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
     systemTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
-    val (timestampFile, timestampFileType) =
-      getTimestampFileAndType(storePath)
+  def checkSchemasModifiedTimeAndReloadTables() {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
@@ -478,7 +514,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore
{
   }
 
   private def refreshCache() {
-    metadata.tablesMeta.clear()
+    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
   }
 
   override def isReadFromHiveMetaStore: Boolean = false


Mime
View raw message