carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/6] incubator-carbondata git commit: add spark2 module
Date Wed, 30 Nov 2016 17:46:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
new file mode 100644
index 0000000..794de02
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.text.SimpleDateFormat
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
+
+/**
+ * Command for the compaction in alter table command
+ *
+ * @param alterTableModel
+ */
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
+  RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    // TODO : Implement it.
+    val tableName = alterTableModel.tableName
+    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+    if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+      .getCarbonTable(databaseName + "_" + tableName)) {
+      logError(s"alter table failed. table not found: $databaseName.$tableName")
+      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
+    }
+
+    val relation =
+      CarbonEnv.get.carbonMetastore
+        .lookupRelation(Option(databaseName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      sys.error(s"Table $databaseName.$tableName does not exist")
+    }
+    val carbonLoadModel = new CarbonLoadModel()
+
+
+    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+
+    val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
+
+    var storeLocation = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+        System.getProperty("java.io.tmpdir")
+      )
+    storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+    try {
+      CarbonDataRDDFactory
+        .alterTableForCompaction(sparkSession.sqlContext,
+          alterTableModel,
+          carbonLoadModel,
+          relation.tableMeta.storePath,
+          kettleHomePath,
+          storeLocation
+        )
+    } catch {
+      case e: Exception =>
+        if (null != e.getMessage) {
+          sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+        } else {
+          sys.error("Exception in compaction. Please check logs for more info.")
+        }
+    }
+    Seq.empty
+  }
+}
+
+case class CreateTable(cm: TableModel) extends RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    val tbName = cm.tableName
+    val dbName = cm.databaseName
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+
+    val tableInfo: TableInfo = TableNewProcessor(cm, sparkSession.sqlContext)
+
+    if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
+      sys.error("No Dimensions found. Table should have at least one dimesnion !")
+    }
+
+//    if (sparkSession.sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) {
+//      if (!cm.ifNotExistsSet) {
+//        LOGGER.audit(
+//          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+//          s"Table [$tbName] already exists under database [$dbName]")
+//        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+//      }
+//    } else {
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.get.carbonMetastore
+      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+//      try {
+//        sparkSession.sql(
+//          s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
+//          s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
+//          .collect
+//      } catch {
+//        case e: Exception =>
+//          val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+//          // call the drop table to delete the created table.
+//
+//          CarbonEnv.get.carbonMetastore
+//            .dropTable(catalog.storePath, identifier)(sparkSession)
+//
+//          LOGGER.audit(s"Table creation with Database name [$dbName] " +
+//                       s"and Table name [$tbName] failed")
+//          throw e
+//      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+//    }
+
+    Seq.empty
+  }
+
+  def setV(ref: Any, name: String, value: Any): Unit = {
+    ref.getClass.getFields.find(_.getName == name).get
+      .set(ref, value.asInstanceOf[AnyRef])
+  }
+}
+
+case class DeleteLoadsById(
+    loadids: Seq[String],
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+
+    val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Delete segment by Id request has been received for $databaseName.$tableName")
+
+    // validate load ids first
+    validateLoadIds
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.get.carbonMetastore.lookupRelation(
+      identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
+
+    if (null == carbonTable) {
+      CarbonEnv.get.carbonMetastore
+        .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
+    }
+    val path = carbonTable.getMetaDataFilepath
+
+    try {
+      val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
+
+      if (invalidLoadIds.isEmpty) {
+
+        LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
+      }
+      else {
+        sys.error("Delete segment by Id is failed. Invalid ID is:" +
+                  s" ${ invalidLoadIds.mkString(",") }")
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+
+    Seq.empty
+
+  }
+
+  // validates load ids
+  private def validateLoadIds: Unit = {
+    if (loadids.isEmpty) {
+      val errorMessage = "Error: Segment id(s) should not be empty."
+      throw new MalformedCarbonCommandException(errorMessage)
+
+    }
+  }
+}
+
+case class DeleteLoadsByLoadDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+
+    LOGGER.audit("The delete segment by load date request has been received.")
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER
+        .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
+               s"exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+
+    val timeObj = Cast(Literal(loadDate), TimestampType).eval()
+    if (null == timeObj) {
+      val errorMessage = "Error: Invalid load start time format " + loadDate
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
+    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+      .getCarbonTable(dbName + '_' + tableName)
+
+    if (null == carbonTable) {
+      var relation = CarbonEnv.get.carbonMetastore
+        .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
+    }
+    val path = carbonTable.getMetaDataFilepath()
+
+    try {
+      val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
+        carbonTable.getAbsoluteTableIdentifier, loadDate, path,
+        timeObj.asInstanceOf[java.lang.Long]).asScala
+      if (invalidLoadTimestamps.isEmpty) {
+        LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
+      }
+      else {
+        sys.error("Delete segment by date is failed. No matching segment found.")
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+    Seq.empty
+
+  }
+
+}
+
+case class LoadTable(
+    databaseNameOp: Option[String],
+    tableName: String,
+    factPathFromUser: String,
+    dimFilesPath: Seq[DataLoadTableFileMapping],
+    options: scala.collection.immutable.Map[String, String],
+    isOverwriteExist: Boolean = false,
+    var inputSqlString: String = null,
+    dataFrame: Option[DataFrame] = None) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    if (isOverwriteExist) {
+      sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
+    }
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+      logError(s"Data loading failed. table not found: $dbName.$tableName")
+      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+    }
+
+    val relation = CarbonEnv.get.carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+        .getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK
+      )
+    try {
+      if (carbonLock.lockWithRetries()) {
+        logInfo("Successfully able to get the table metadata file lock")
+      } else {
+        sys.error("Table is locked for updation. Please try after some time")
+      }
+
+      val factPath = if (dataFrame.isDefined) {
+        ""
+      } else {
+        FileUtils.getPaths(
+          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      }
+      val carbonLoadModel = new CarbonLoadModel()
+      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      if (dimFilesPath.isEmpty) {
+        carbonLoadModel.setDimFolderPath(null)
+      } else {
+        val x = dimFilesPath.map(f => f.table + ":" + CarbonUtil.checkAndAppendHDFSUrl(f.loadPath))
+        carbonLoadModel.setDimFolderPath(x.mkString(","))
+      }
+
+      val table = relation.tableMeta.carbonTable
+      carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+      carbonLoadModel.setTableName(table.getFactTableName)
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+
+      var partitionLocation = relation.tableMeta.storePath + "/partition/" +
+                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+
+
+      val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+      val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
+
+      // TODO It will be removed after kettle is removed.
+      val useKettle = options.get("use_kettle") match {
+        case Some(value) => value.toBoolean
+        case _ =>
+          val useKettleLocal = System.getProperty("use.kettle")
+          if (useKettleLocal == null) {
+            sparkSession.sqlContext.sparkContext.getConf.get("use_kettle_default", "true").toBoolean
+          } else {
+            useKettleLocal.toBoolean
+          }
+      }
+
+      val delimiter = options.getOrElse("delimiter", ",")
+      val quoteChar = options.getOrElse("quotechar", "\"")
+      val fileHeader = options.getOrElse("fileheader", "")
+      val escapeChar = options.getOrElse("escapechar", "\\")
+      val commentchar = options.getOrElse("commentchar", "#")
+      val columnDict = options.getOrElse("columndict", null)
+      val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
+      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
+      val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+      val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
+      val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
+      val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
+      val dateFormat = options.getOrElse("dateformat", null)
+      validateDateFormat(dateFormat, table)
+      val multiLine = options.getOrElse("multiline", "false").trim.toLowerCase match {
+        case "true" => true
+        case "false" => false
+        case illegal =>
+          val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
+                             "load DDL which you set can only be 'true' or 'false', please check " +
+                             "your input DDL."
+          throw new MalformedCarbonCommandException(errorMessage)
+      }
+      val maxColumns = options.getOrElse("maxcolumns", null)
+      carbonLoadModel.setMaxColumns(maxColumns)
+      carbonLoadModel.setEscapeChar(escapeChar)
+      carbonLoadModel.setQuoteChar(quoteChar)
+      carbonLoadModel.setCommentChar(commentchar)
+      carbonLoadModel.setDateFormat(dateFormat)
+      carbonLoadModel
+        .setSerializationNullFormat(
+          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat)
+      carbonLoadModel
+        .setBadRecordsLoggerEnable(
+          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
+      carbonLoadModel
+        .setBadRecordsAction(
+          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
+
+      if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
+          complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
+          delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
+        sys.error(s"Field Delimiter & Complex types delimiter are same")
+      }
+      else {
+        carbonLoadModel.setComplexDelimiterLevel1(
+          CarbonUtil.delimiterConverter(complex_delimiter_level_1))
+        carbonLoadModel.setComplexDelimiterLevel2(
+          CarbonUtil.delimiterConverter(complex_delimiter_level_2))
+      }
+      // set local dictionary path, and dictionary file extension
+      carbonLoadModel.setAllDictPath(allDictionaryPath)
+
+      var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      try {
+        // First system has to partition the data first and then call the load data
+        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+        carbonLoadModel.setFactFilePath(factPath)
+        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
+        carbonLoadModel.setCsvHeader(fileHeader)
+        carbonLoadModel.setColDictFilePath(columnDict)
+        carbonLoadModel.setDirectLoad(true)
+        GlobalDictionaryUtil
+          .generateGlobalDictionary(
+          sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, dataFrame)
+        CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
+            kettleHomePath,
+            columnar,
+            partitionStatus,
+            useKettle,
+            dataFrame)
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex)
+          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
+          throw ex
+      } finally {
+        // Once the data load is successful delete the unwanted partition files
+        try {
+          val fileType = FileFactory.getFileType(partitionLocation)
+          if (FileFactory.isFileExist(partitionLocation, fileType)) {
+            val file = FileFactory
+              .getCarbonFile(partitionLocation, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
+          }
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex)
+            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
+                         "Problem deleting the partition folder")
+            throw ex
+        }
+
+      }
+    } catch {
+      case dle: DataLoadingException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
+        throw dle
+      case mce: MalformedCarbonCommandException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
+        throw mce
+    } finally {
+      if (carbonLock != null) {
+        if (carbonLock.unlock()) {
+          logInfo("Table MetaData Unlocked Successfully after data load")
+        } else {
+          logError("Unable to unlock Table MetaData")
+        }
+      }
+    }
+    Seq.empty
+  }
+
+  private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
+    val dimensions = table.getDimensionByTableName(tableName).asScala
+    if (dateFormat != null) {
+      if (dateFormat.trim == "") {
+        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
+                                                  "string.")
+      } else {
+        var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
+        for (singleDateFormat <- dateFormats) {
+          val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
+          val columnName = dateFormatSplits(0).trim.toLowerCase
+          if (!dimensions.exists(_.getColName.equals(columnName))) {
+            throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
+                                                      dateFormatSplits(0) +
+                                                      " is provided in Option DateFormat.")
+          }
+          if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
+            throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
+                                                      "for " + "Column " + dateFormatSplits(0) +
+                                                      ".")
+          }
+        }
+      }
+    }
+  }
+}
+
+private[sql] case class DescribeCommandFormatted(
+    child: SparkPlan,
+    override val output: Seq[Attribute],
+    tblIdentifier: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val mapper = new ObjectMapper()
+    val colProps = StringBuilder.newBuilder
+    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+      val comment = if (relation.metaData.dims.contains(field.name)) {
+        val dimension = relation.metaData.carbonTable.getDimensionByName(
+          relation.tableMeta.carbonTableIdentifier.getTableName,
+          field.name)
+        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+          colProps.append(field.name).append(".")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
+        }
+        if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+            !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          "DICTIONARY, KEY COLUMN"
+        } else {
+          "KEY COLUMN"
+        }
+      } else {
+        ("MEASURE")
+      }
+      (field.name, field.dataType.simpleString, comment)
+    }
+    val colPropStr = if (colProps.toString().trim().length() > 0) {
+      // drops additional comma at end
+      colProps.toString().dropRight(1)
+    } else {
+      colProps.toString()
+    }
+    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+      .getDatabaseName, "")
+    )
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+    if (colPropStr.length() > 0) {
+      results ++= Seq((colPropStr, "", ""))
+    } else {
+      results ++= Seq(("NONE", "", ""))
+    }
+    val dimension = carbonTable
+      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    results ++= getColumnGroups(dimension.asScala.toList)
+    results.map { case (name, dataType, comment) =>
+      Row(f"$name%-36s $dataType%-80s $comment%-72s")
+    }
+  }
+
+  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+    var results: Seq[(String, String, String)] =
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
+    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+      case (groupId, _) => groupId != -1
+    }.toSeq.sortBy(_._1)
+    val groups = groupedDimensions.map(colGroups => {
+      colGroups._2.map(dim => dim.getColName).mkString(", ")
+    })
+    var index = 1
+    groups.map { x =>
+      results = results :+ (s"Column Group $index", x, "")
+      index = index + 1
+    }
+    results
+  }
+}
+
+private[sql] case class DeleteLoadByDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    dateValue: String
+) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+    var level: String = ""
+    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+      .getInstance().getCarbonTable(dbName + '_' + tableName)
+    if (relation == null) {
+      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
+      filter => filter.name.equalsIgnoreCase(dateField) &&
+                filter.dataType.isInstanceOf[TimestampType]).toList
+    if (matches.isEmpty) {
+      LOGGER.audit("The delete load by date is failed. " +
+                   s"Table $dbName.$tableName does not contain date field: $dateField")
+      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
+    } else {
+      level = matches.asJava.get(0).name
+    }
+    val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
+      .getColName
+    CarbonDataRDDFactory.deleteLoadByDate(
+      sparkSession.sqlContext,
+      new CarbonDataLoadSchema(carbonTable),
+      dbName,
+      tableName,
+      CarbonEnv.get.carbonMetastore.storePath,
+      level,
+      actualColName,
+      dateValue)
+    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
+    Seq.empty
+  }
+
+}
+
+case class CleanFiles(
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"The clean files request is failed. Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+
+    val carbonLoadModel = new CarbonLoadModel()
+    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+    val table = relation.tableMeta.carbonTable
+    carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    try {
+      CarbonDataRDDFactory.cleanFiles(
+        sparkSession.sqlContext.sparkContext,
+        carbonLoadModel,
+        relation.tableMeta.storePath)
+      LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
+    }
+    Seq.empty
+  }
+}
+
+case class ShowLoads(
+    databaseNameOp: Option[String],
+    tableName: String,
+    limit: Option[String],
+    override val output: Seq[Attribute]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableUniqueName = databaseName + "_" + tableName
+    // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
+    // schema is changed by other process, so that tableInfoMap woulb be refilled.
+    val tableExists = CarbonEnv.get.carbonMetastore
+        .tableExists(TableIdentifier(tableName, databaseNameOp))(sparkSession)
+    if (!tableExists) {
+      sys.error(s"$databaseName.$tableName is not found")
+    }
+    val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+        .getCarbonTable(tableUniqueName)
+    if (carbonTable == null) {
+      sys.error(s"$databaseName.$tableName is not found")
+    }
+    val path = carbonTable.getMetaDataFilepath
+    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
+    if (loadMetadataDetailsArray.nonEmpty) {
+
+      val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
+
+      var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
+        (l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
+            .parseDouble(l2.getLoadName)
+      )
+
+
+      if (limit.isDefined) {
+        loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
+            .filter(load => load.getVisibility.equalsIgnoreCase("true"))
+        val limitLoads = limit.get
+        try {
+          val lim = Integer.parseInt(limitLoads)
+          loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
+        } catch {
+          case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
+        }
+
+      }
+
+      loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
+          .map(load =>
+            Row(
+              load.getLoadName,
+              load.getLoadStatus,
+              new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
+              new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
+    } else {
+      Seq.empty
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
new file mode 100644
index 0000000..86c55d3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+
+/**
+ * This class contains all carbon hive metadata related utilities
+ */
+object CarbonHiveMetadataUtil {
+
+  @transient
+  val LOGGER = LogServiceFactory.getLogService(CarbonHiveMetadataUtil.getClass.getName)
+
+
+  /**
+   * This method invalidates the table from HiveMetastoreCatalog before dropping table
+   *
+   * @param schemaName
+   * @param cubeName
+   * @param sparkSession
+   */
+  def invalidateAndDropTable(schemaName: String,
+      cubeName: String,
+      sparkSession: SparkSession): Unit = {
+    val tableWithDb = schemaName + "." + cubeName
+    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableWithDb)
+    try {
+      // todo(wf): in spark no invalidate method now
+      sparkSession.sql(s"DROP TABLE IF EXISTS $schemaName.$cubeName")
+    } catch {
+      case e: Exception =>
+        LOGGER.audit(
+          s"Error While deleting the table $schemaName.$cubeName during drop carbon table" +
+          e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
new file mode 100644
index 0000000..98b481e
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io._
+import java.util.{GregorianCalendar, LinkedHashSet, UUID}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.Array.canBuildFrom
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
+import org.apache.carbondata.core.reader.ThriftReader
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.lcm.locks.ZookeeperInit
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
+
+case class CarbonMetaData(dims: Seq[String],
+    msrs: Seq[String],
+    carbonTable: CarbonTable,
+    dictionaryMap: DictionaryMap)
+
+object CarbonMetastore {
+
+  def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = {
+    val createTBase = new ThriftReader.TBaseCreator() {
+      override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+        new TableInfo()
+      }
+    }
+    val thriftReader = new ThriftReader(schemaFilePath, createTBase)
+    var tableInfo: TableInfo = null
+    try {
+      thriftReader.open()
+      tableInfo = thriftReader.read().asInstanceOf[TableInfo]
+    } finally {
+      thriftReader.close()
+    }
+    tableInfo
+  }
+
+  def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = {
+    val thriftWriter = new ThriftWriter(schemaFilePath, false)
+    try {
+      thriftWriter.open()
+      thriftWriter.write(tableInfo);
+    } finally {
+      thriftWriter.close()
+    }
+  }
+
+}
+
+case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
+  def get(name: String): Option[Boolean] = {
+    dictionaryMap.get(name.toLowerCase)
+  }
+}
+
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) extends Logging {
+
+  @transient
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
+
+  val tableModifiedTimeStore = new java.util.HashMap[String, Long]()
+  tableModifiedTimeStore
+    .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis())
+
+  private val nextId = new AtomicLong(0)
+
+  def nextQueryId: String = {
+    s"query_${nextId.getAndIncrement()}"
+  }
+
+  val metadata = loadMetadata(storePath, nextQueryId)
+
+  def getTableCreationTime(databaseName: String, tableName: String): Long = {
+    val tableMeta = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+    val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime
+    tableCreationTime
+  }
+
+  def lookupRelation(dbName: Option[String],
+                     tableName: String)(sparkSession: SparkSession): LogicalPlan = {
+    lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
+  }
+
+  def lookupRelation(tableIdentifier: TableIdentifier,
+                     alias: Option[String] = None)(sparkSession: SparkSession): LogicalPlan = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(
+      sparkSession.catalog.currentDatabase
+    )
+    val tables = getTableFromMetadata(database, tableIdentifier.table)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableIdentifier.table,
+          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)
+      case None =>
+        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+        throw new NoSuchTableException(database, tableIdentifier.table)
+    }
+  }
+
+  /**
+   * This method will search for a table in the catalog metadata
+   *
+   * @param database
+   * @param tableName
+   * @return
+   */
+  def getTableFromMetadata(database: String,
+      tableName: String): Option[TableMeta] = {
+    metadata.tablesMeta
+      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  }
+
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
+  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
+    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+    val statistic = new QueryStatistic()
+    // creating zookeeper instance once.
+    // if zookeeper is configured as carbon lock type.
+    val zookeeperUrl: String = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
+    if (zookeeperUrl != null) {
+      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
+      ZookeeperInit.getInstance(zookeeperUrl)
+      LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
+      var configuredLockType = CarbonProperties.getInstance
+        .getProperty(CarbonCommonConstants.LOCK_TYPE)
+      if (null == configuredLockType) {
+        configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+        CarbonProperties.getInstance
+          .addProperty(CarbonCommonConstants.LOCK_TYPE,
+            configuredLockType)
+      }
+    }
+
+    if (metadataPath == null) {
+      return null
+    }
+    val fileType = FileFactory.getFileType(metadataPath)
+    val metaDataBuffer = new ArrayBuffer[TableMeta]
+    fillMetaData(metadataPath, fileType, metaDataBuffer)
+    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
+    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
+      System.currentTimeMillis())
+    recorder.recordStatisticsForDriver(statistic, queryId)
+    MetaData(metaDataBuffer)
+  }
+
+  private def fillMetaData(basePath: String, fileType: FileType,
+      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
+    val databasePath = basePath // + "/schemas"
+    try {
+      if (FileFactory.isFileExist(databasePath, fileType)) {
+        val file = FileFactory.getCarbonFile(databasePath, fileType)
+        val databaseFolders = file.listFiles()
+
+        databaseFolders.foreach(databaseFolder => {
+          if (databaseFolder.isDirectory) {
+            val dbName = databaseFolder.getName
+            val tableFolders = databaseFolder.listFiles()
+
+            tableFolders.foreach(tableFolder => {
+              if (tableFolder.isDirectory) {
+                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
+                  tableFolder.getName, UUID.randomUUID().toString)
+                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
+                  carbonTableIdentifier)
+                val tableMetadataFile = carbonTablePath.getSchemaFilePath
+
+                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+                  val tableName = tableFolder.getName
+                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
+
+
+                  val createTBase = new ThriftReader.TBaseCreator() {
+                    override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+                      new TableInfo()
+                    }
+                  }
+                  val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
+                  thriftReader.open()
+                  val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
+                  thriftReader.close()
+
+                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
+                  val wrapperTableInfo = schemaConverter
+                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
+                  val schemaFilePath = CarbonStorePath
+                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+                  wrapperTableInfo.setStorePath(storePath)
+                  wrapperTableInfo
+                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+                  val carbonTable =
+                    org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+                      .getCarbonTable(tableUniqueName)
+                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
+                    carbonTable)
+                }
+              }
+            })
+          }
+        })
+      } else {
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        FileFactory.mkdirs(databasePath, fileType)
+    }
+  }
+
+  /**
+   *
+   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
+   * Load CarbonTable from wrapper tableinfo
+   *
+   */
+  def createTableFromThrift(
+      tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
+      dbName: String, tableName: String)
+    (sparkSession: SparkSession): String = {
+    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sparkSession)) {
+      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
+    }
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val thriftTableInfo = schemaConverter
+      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+      .add(schemaEvolutionEntry)
+
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
+      tableInfo.getFactTable.getTableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(storePath)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+      CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
+
+    val fileType = FileFactory.getFileType(schemaMetadataPath)
+    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+      FileFactory.mkdirs(schemaMetadataPath, fileType)
+    }
+    val thriftWriter = new ThriftWriter(schemaFilePath, false)
+    thriftWriter.open()
+    thriftWriter.write(thriftTableInfo)
+    thriftWriter.close()
+    metadata.tablesMeta += tableMeta
+    logInfo(s"Table $tableName for Database $dbName created successfully.")
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+    carbonTablePath.getPath
+  }
+
+  private def updateMetadataByWrapperTable(
+      wrapperTableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo): Unit = {
+
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      wrapperTableInfo.getTableUniqueName)
+    for (i <- metadata.tablesMeta.indices) {
+      if (wrapperTableInfo.getTableUniqueName.equals(
+        metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) {
+        metadata.tablesMeta(i).carbonTable = carbonTable
+      }
+    }
+  }
+
+  def updateMetadataByThriftTable(schemaFilePath: String,
+      tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = {
+
+    tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis())
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+    wrapperTableInfo
+      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+    wrapperTableInfo.setStorePath(storePath)
+    updateMetadataByWrapperTable(wrapperTableInfo)
+  }
+
+  /**
+   * Shows all schemas which has Database name like
+   */
+  def showDatabases(schemaLike: Option[String]): Seq[String] = {
+    checkSchemasModifiedTimeAndReloadTables()
+    metadata.tablesMeta.map { c =>
+      schemaLike match {
+        case Some(name) =>
+          if (c.carbonTableIdentifier.getDatabaseName.contains(name)) {
+            c.carbonTableIdentifier
+              .getDatabaseName
+          } else {
+            null
+          }
+        case _ => c.carbonTableIdentifier.getDatabaseName
+      }
+    }.filter(f => f != null)
+  }
+
+  /**
+   * Shows all tables in all schemas.
+   */
+  def getAllTables(): Seq[TableIdentifier] = {
+    checkSchemasModifiedTimeAndReloadTables()
+    metadata.tablesMeta.map { c =>
+      TableIdentifier(c.carbonTableIdentifier.getTableName,
+        Some(c.carbonTableIdentifier.getDatabaseName))
+    }
+  }
+
+  def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = tableIdentifier.table
+
+    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+    val fileType = FileFactory.getFileType(tablePath)
+    FileFactory.isFileExist(tablePath, fileType)
+  }
+
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession) {
+    val dbName = tableIdentifier.database.get
+    val tableName = tableIdentifier.table
+
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+
+    val fileType = FileFactory.getFileType(metadataFilePath)
+
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      // while drop we should refresh the schema modified time so that if any thing has changed
+      // in the other beeline need to update.
+      checkSchemasModifiedTimeAndReloadTables
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+        tableIdentifier.table)
+      metadataToBeRemoved match {
+        case Some(tableMeta) =>
+          metadata.tablesMeta -= tableMeta
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+        case None =>
+          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+      }
+      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
+      // discard cached table info in cachedDataSourceTables
+      sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+    }
+  }
+
+  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
+    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+    val timestampFileType = FileFactory.getFileType(timestampFile)
+    (timestampFile, timestampFileType)
+  }
+
+  /**
+   * This method will put the updated timestamp of schema file in the table modified time store map
+   *
+   * @param timeStamp
+   */
+  def updateSchemasUpdatedTime(timeStamp: Long) {
+    tableModifiedTimeStore.put("default", timeStamp)
+  }
+
+  /**
+   * This method will read the timestamp of empty schema file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+    } else {
+      System.currentTimeMillis()
+    }
+  }
+
+  /**
+   * This method will check and create an empty schema timestamp file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      FileFactory.createNewFile(timestampFile, timestampFileType)
+    }
+    val systemTime = System.currentTimeMillis()
+    FileFactory.getCarbonFile(timestampFile, timestampFileType)
+      .setLastModifiedTime(systemTime)
+    systemTime
+  }
+
+  def checkSchemasModifiedTimeAndReloadTables() {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
+        getLastModifiedTime ==
+            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+        refreshCache()
+      }
+    }
+  }
+
+  def refreshCache() {
+    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+  }
+
+  def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = {
+    var schemaLastUpdatedTime = System.currentTimeMillis
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType)
+        .getLastModifiedTime
+    }
+    schemaLastUpdatedTime
+  }
+
+  def readTableMetaDataFile(tableFolder: CarbonFile,
+      fileType: FileFactory.FileType):
+  (String, String, String, String, Partitioner, Long) = {
+    val tableMetadataFile = tableFolder.getAbsolutePath + "/metadata"
+
+    var schema: String = ""
+    var databaseName: String = ""
+    var tableName: String = ""
+    var dataPath: String = ""
+    var partitioner: Partitioner = null
+    val cal = new GregorianCalendar(2011, 1, 1)
+    var tableCreationTime = cal.getTime.getTime
+
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      // load metadata
+      val in = FileFactory.getDataInputStream(tableMetadataFile, fileType)
+      var len = 0
+      try {
+        len = in.readInt()
+      } catch {
+        case others: EOFException => len = 0
+      }
+
+      while (len > 0) {
+        val databaseNameBytes = new Array[Byte](len)
+        in.readFully(databaseNameBytes)
+
+        databaseName = new String(databaseNameBytes, "UTF8")
+        val tableNameLen = in.readInt()
+        val tableNameBytes = new Array[Byte](tableNameLen)
+        in.readFully(tableNameBytes)
+        tableName = new String(tableNameBytes, "UTF8")
+
+        val dataPathLen = in.readInt()
+        val dataPathBytes = new Array[Byte](dataPathLen)
+        in.readFully(dataPathBytes)
+        dataPath = new String(dataPathBytes, "UTF8")
+
+        val versionLength = in.readInt()
+        val versionBytes = new Array[Byte](versionLength)
+        in.readFully(versionBytes)
+
+        val schemaLen = in.readInt()
+        val schemaBytes = new Array[Byte](schemaLen)
+        in.readFully(schemaBytes)
+        schema = new String(schemaBytes, "UTF8")
+
+        val partitionLength = in.readInt()
+        val partitionBytes = new Array[Byte](partitionLength)
+        in.readFully(partitionBytes)
+        val inStream = new ByteArrayInputStream(partitionBytes)
+        val objStream = new ObjectInputStream(inStream)
+        partitioner = objStream.readObject().asInstanceOf[Partitioner]
+        objStream.close()
+
+        try {
+          tableCreationTime = in.readLong()
+          len = in.readInt()
+        } catch {
+          case others: EOFException => len = 0
+        }
+
+      }
+      in.close()
+    }
+
+    (databaseName, tableName, dataPath, schema, partitioner, tableCreationTime)
+  }
+
+  def createDatabaseDirectory(dbName: String) {
+    val databasePath = storePath + File.separator + dbName
+    val fileType = FileFactory.getFileType(databasePath)
+    FileFactory.mkdirs(databasePath, fileType)
+  }
+
+  def dropDatabaseDirectory(dbName: String) {
+    val databasePath = storePath + File.separator + dbName
+    val fileType = FileFactory.getFileType(databasePath)
+    if (FileFactory.isFileExist(databasePath, fileType)) {
+      val dbPath = FileFactory.getCarbonFile(databasePath, fileType)
+      CarbonUtil.deleteFoldersAndFiles(dbPath)
+    }
+  }
+
+}
+
+
+object CarbonMetastoreTypes extends RegexParsers {
+  protected lazy val primitiveType: Parser[DataType] =
+    "string" ^^^ StringType |
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "timestamp" ^^^ TimestampType
+
+  protected lazy val fixedDecimalType: Parser[DataType] =
+    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+      case precision ~ scale =>
+        DecimalType(precision.toInt, scale.toInt)
+    }
+
+  protected lazy val arrayType: Parser[DataType] =
+    "array" ~> "<" ~> dataType <~ ">" ^^ {
+      case tpe => ArrayType(tpe)
+    }
+
+  protected lazy val mapType: Parser[DataType] =
+    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    }
+
+  protected lazy val structField: Parser[StructField] =
+    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+    }
+
+  protected lazy val structType: Parser[DataType] =
+    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+      case fields => StructType(fields)
+    }
+
+  protected lazy val dataType: Parser[DataType] =
+    arrayType |
+    mapType |
+    structType |
+    primitiveType
+
+  def toDataType(metastoreType: String): DataType = {
+    parseAll(dataType, metastoreType) match {
+      case Success(result, _) => result
+      case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
+    }
+  }
+
+  def toMetastoreType(dt: DataType): String = {
+    dt match {
+      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+      case StructType(fields) =>
+        s"struct<${
+          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+            .mkString(",")
+        }>"
+      case StringType => "string"
+      case FloatType => "float"
+      case IntegerType => "int"
+      case ShortType => "tinyint"
+      case DoubleType => "double"
+      case LongType => "bigint"
+      case BinaryType => "binary"
+      case BooleanType => "boolean"
+      case DecimalType() => "decimal"
+      case TimestampType => "timestamp"
+    }
+  }
+}
+
+
+/**
+ * Represents logical plan for one carbon table
+ */
+case class CarbonRelation(
+    databaseName: String,
+    tableName: String,
+    metaData: CarbonMetaData,
+    tableMeta: TableMeta,
+    alias: Option[String])
+  extends LeafNode with MultiInstanceRelation {
+
+  def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
+    childDim.getDataType.toString.toLowerCase match {
+      case "array" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(childDim.getColName) }>"
+      case "struct" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(childDim.getColName) }>"
+      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+    }
+  }
+
+  def getArrayChildren(dimName: String): String = {
+    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.toString.toLowerCase match {
+        case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
+        case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
+        case dType => addDecimalScaleAndPrecision(childDim, dType)
+      }
+    }).mkString(",")
+  }
+
+  def getStructChildren(dimName: String): String = {
+    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.toString.toLowerCase match {
+        case "array" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:array<${ getArrayChildren(childDim.getColName) }>"
+        case "struct" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
+          .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
+        }>"
+        case dType => s"${ childDim.getColName
+          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+      }
+    }).mkString(",")
+  }
+
+  override def newInstance(): LogicalPlan = {
+    CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)
+      .asInstanceOf[this.type]
+  }
+
+  val dimensionsAttr = {
+    val sett = new LinkedHashSet(
+      tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
+        .asScala.asJava)
+    sett.asScala.toSeq.filter(!_.getColumnSchema.isInvisible).map(dim => {
+      val dimval = metaData.carbonTable
+        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+      val output: DataType = dimval.getDataType
+        .toString.toLowerCase match {
+        case "array" =>
+          CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
+        case "struct" =>
+          CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
+        case dType =>
+          val dataType = addDecimalScaleAndPrecision(dimval, dType)
+          CarbonMetastoreTypes.toDataType(dataType)
+      }
+
+      AttributeReference(
+        dim.getColName,
+        output,
+        nullable = true)()
+    })
+  }
+
+  val measureAttr = {
+    val factTable = tableMeta.carbonTable.getFactTableName
+    new LinkedHashSet(
+      tableMeta.carbonTable.
+        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
+        asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
+      .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
+        metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+          .toLowerCase match {
+          case "int" => "long"
+          case "short" => "long"
+          case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+          case others => others
+        }),
+        nullable = true)())
+  }
+
+  override val output = dimensionsAttr ++ measureAttr
+
+  // TODO: Use data from the footers.
+  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case p: CarbonRelation =>
+        p.databaseName == databaseName && p.output == output && p.tableName == tableName
+      case _ => false
+    }
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
+    var dType = dataType
+    if (dimval.getDataType
+      == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) {
+      dType +=
+        "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
+
+  private var tableStatusLastUpdateTime = 0L
+
+  private var sizeInBytesLocalValue = 0L
+
+  def sizeInBytes: Long = {
+    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
+      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+
+    if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        tableMeta.storePath,
+        tableMeta.carbonTableIdentifier).getPath
+      val fileType = FileFactory.getFileType(tablePath)
+      if(FileFactory.isFileExist(tablePath, fileType)) {
+        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
+        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+      }
+    }
+    sizeInBytesLocalValue
+  }
+
+}


Mime
View raw message