carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [09/15] carbondata git commit: [CARBONDATA-1301] change command to update schema and data separately
Date Mon, 17 Jul 2017 01:56:55 GMT
[CARBONDATA-1301] change command to update schema and data separately

This closes #1160


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c7f0b108
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c7f0b108
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c7f0b108

Branch: refs/heads/datamap
Commit: c7f0b1081c6055970458712bf29d874fe95c7004
Parents: 2a9debf
Author: jackylk <jacky.likun@huawei.com>
Authored: Wed Jul 12 00:40:44 2017 +0800
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Wed Jul 12 21:07:14 2017 +0530

----------------------------------------------------------------------
 .../sql/execution/command/IUDCommands.scala     | 146 +++----
 .../execution/command/carbonTableSchema.scala   | 414 ++++++++++---------
 .../spark/sql/hive/CarbonFileMetastore.scala    |   3 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  11 -
 4 files changed, 296 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 2c1de52..d3a80d4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -58,31 +58,35 @@ import org.apache.carbondata.spark.util.QueryPlanUtil
 private[sql] case class ProjectForDeleteCommand(
      plan: LogicalPlan,
      identifier: Seq[String],
-     timestamp: String) extends RunnableCommand {
+     timestamp: String) extends RunnableCommand with DataProcessCommand {
 
-  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
   var horizontalCompactionFailed = false
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val dataFrame = Dataset.ofRows(sparkSession, plan)
-//    dataFrame.show(truncate = false)
-//    dataFrame.collect().foreach(println)
+    //    dataFrame.show(truncate = false)
+    //    dataFrame.collect().foreach(println)
     val dataRdd = dataFrame.rdd
 
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
+        .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
+        asInstanceOf[CarbonRelation]
     val carbonTable = relation.tableMeta.carbonTable
     val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.METADATA_LOCK)
     var lockStatus = false
     try {
       lockStatus = metadataLock.lockWithRetries()
-      LOG.audit(s" Delete data request has been received " +
-                s"for ${ relation.databaseName }.${ relation.tableName }.")
+      LOGGER.audit(s" Delete data request has been received " +
+          s"for ${ relation.databaseName }.${ relation.tableName }.")
       if (lockStatus) {
-        LOG.info("Successfully able to get the table metadata file lock")
+        LOGGER.info("Successfully able to get the table metadata file lock")
       }
       else {
         throw new Exception("Table is locked for deletion. Please try after some time")
@@ -92,23 +96,23 @@ private[sql] case class ProjectForDeleteCommand(
         carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
       var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
 
-        // handle the clean up of IUD.
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-          if (deleteExecution
-            .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
-              false, executorErrors)) {
-            // call IUD Compaction.
-            IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = false)
-          }
+      if (deleteExecution
+          .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
+            false, executorErrors)) {
+        // call IUD Compaction.
+        IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = false)
+      }
     } catch {
       case e: HorizontalCompactionException =>
-          LOG.error("Delete operation passed. Exception in Horizontal Compaction." +
-              " Please check logs. " + e.getMessage)
-          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+        LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
+            " Please check logs. " + e.getMessage)
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 
       case e: Exception =>
-        LOG.error(e, "Exception in Delete data operation " + e.getMessage)
+        LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
@@ -130,47 +134,51 @@ private[sql] case class ProjectForDeleteCommand(
 }
 
 private[sql] case class ProjectForUpdateCommand(
-    plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
+    plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand
+    with DataProcessCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
 
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
 
-   //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
+    //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
     //  .EXECUTION_ID_KEY, null)
     // DataFrame(sqlContext, plan).show(truncate = false)
     // return Seq.empty
 
 
     val res = plan find {
-      case relation: LogicalRelation if (relation.relation
-        .isInstanceOf[CarbonDatasourceHadoopRelation]) =>
+      case relation: LogicalRelation if relation.relation
+          .isInstanceOf[CarbonDatasourceHadoopRelation] =>
         true
       case _ => false
     }
 
-    if (!res.isDefined) {
+    if (res.isEmpty) {
       return Seq.empty
     }
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-//    val relation = CarbonEnv.get.carbonMetastore
-//      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
-//      asInstanceOf[CarbonRelation]
+        .lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
+        asInstanceOf[CarbonRelation]
+    //    val relation = CarbonEnv.get.carbonMetastore
+    //      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
+    //      asInstanceOf[CarbonRelation]
     val carbonTable = relation.tableMeta.carbonTable
     val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.METADATA_LOCK)
     var lockStatus = false
     // get the current time stamp which should be same for delete and update.
     val currentTime = CarbonUpdateUtil.readCurrentTime
-//    var dataFrame: DataFrame = null
+    //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
     val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.isPersistEnabled,
-        CarbonCommonConstants.defaultValueIsPersistEnabled)
-   var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
+        .getProperty(CarbonCommonConstants.isPersistEnabled,
+          CarbonCommonConstants.defaultValueIsPersistEnabled)
+    var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
     if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
       isPersistEnabled = false
     }
@@ -188,48 +196,46 @@ private[sql] case class ProjectForUpdateCommand(
       val tablePath = CarbonStorePath.getCarbonTablePath(
         carbonTable.getStorePath,
         carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
-        // Get RDD.
+      // Get RDD.
 
       dataSet = if (isPersistEnabled) {
-          Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK)
-//          DataFrame(sqlContext, plan)
-//            .persist(StorageLevel.MEMORY_AND_DISK)
-        }
-        else {
-          Dataset.ofRows(sparkSession, plan)
-//          DataFrame(sqlContext, plan)
-        }
-        var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK)
+        //          DataFrame(sqlContext, plan)
+        //            .persist(StorageLevel.MEMORY_AND_DISK)
+      }
+      else {
+        Dataset.ofRows(sparkSession, plan)
+        //          DataFrame(sqlContext, plan)
+      }
+      var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
 
 
-        // handle the clean up of IUD.
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-        // do delete operation.
-        deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-          currentTime + "",
+      // do delete operation.
+      deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
+        currentTime + "",
         relation, isUpdateOperation = true, executionErrors)
 
-        if(executionErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executionErrors.errorMsg)
-        }
-
-        // do update operation.
-        UpdateExecution.performUpdate(dataSet, tableIdentifier, plan,
-          sparkSession, currentTime, executionErrors)
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
 
-        if(executionErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executionErrors.errorMsg)
-        }
+      // do update operation.
+      UpdateExecution.performUpdate(dataSet, tableIdentifier, plan,
+        sparkSession, currentTime, executionErrors)
 
-        // Do IUD Compaction.
-        IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
-    }
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
 
-    catch {
+      // Do IUD Compaction.
+      IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+    } catch {
       case e: HorizontalCompactionException =>
         LOGGER.error(
-            "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
+          "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
         // In case of failure , clean all related delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 88e89ad..cc18fa3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -72,18 +72,36 @@ object Checker {
 }
 
 /**
+ * Interface for command that modifies schema
+ */
+trait SchemaProcessCommand {
+  def processSchema(sparkSession: SparkSession): Seq[Row]
+}
+
+/**
+ * Interface for command that need to process data in file system
+ */
+trait DataProcessCommand {
+  def processData(sparkSession: SparkSession): Seq[Row]
+}
+
+/**
  * Command for show table partitions Command
  *
  * @param tableIdentifier
  */
 private[sql] case class ShowCarbonPartitionsCommand(
-    tableIdentifier: TableIdentifier) extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
+    tableIdentifier: TableIdentifier) extends RunnableCommand with SchemaProcessCommand {
+
   override val output = CommonUtil.partitionInfoOutput
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession).
-      asInstanceOf[CarbonRelation]
+        .lookupRelation(tableIdentifier)(sparkSession).
+        asInstanceOf[CarbonRelation]
     val carbonTable = relation.tableMeta.carbonTable
     var tableName = carbonTable.getFactTableName
     var partitionInfo = carbonTable.getPartitionInfo(
@@ -94,6 +112,7 @@ private[sql] case class ShowCarbonPartitionsCommand(
     }
     var partitionType = partitionInfo.getPartitionType
     var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+    val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
     LOGGER.info("partition column name:" + columnName)
     CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
   }
@@ -104,17 +123,22 @@ private[sql] case class ShowCarbonPartitionsCommand(
  *
  * @param alterTableModel
  */
-case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand
+    with DataProcessCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     val relation =
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .lookupRelation(Option(databaseName), tableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
+          .lookupRelation(Option(databaseName), tableName)(sparkSession)
+          .asInstanceOf[CarbonRelation]
     if (relation == null) {
       sys.error(s"Table $databaseName.$tableName does not exist")
     }
@@ -135,18 +159,18 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
     var storeLocation = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-        System.getProperty("java.io.tmpdir")
-      )
+        .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+          System.getProperty("java.io.tmpdir")
+        )
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
     try {
       CarbonDataRDDFactory
-        .alterTableForCompaction(sparkSession.sqlContext,
-          alterTableModel,
-          carbonLoadModel,
-          relation.tableMeta.storePath,
-          storeLocation
-        )
+          .alterTableForCompaction(sparkSession.sqlContext,
+            alterTableModel,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
+            storeLocation
+          )
     } catch {
       case e: Exception =>
         if (null != e.getMessage) {
@@ -159,9 +183,19 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
   }
 }
 
-case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
+case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand
+    with SchemaProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  def setV(ref: Any, name: String, value: Any): Unit = {
+    ref.getClass.getFields.find(_.getName == name).get
+      .set(ref, value.asInstanceOf[AnyRef])
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
@@ -176,11 +210,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
     }
 
     if (sparkSession.sessionState.catalog.listTables(dbName)
-      .exists(_.table.equalsIgnoreCase(tbName))) {
+        .exists(_.table.equalsIgnoreCase(tbName))) {
       if (!cm.ifNotExistsSet) {
         LOGGER.audit(
           s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
-          s"Table [$tbName] already exists under database [$dbName]")
+              s"Table [$tbName] already exists under database [$dbName]")
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
     } else {
@@ -196,48 +230,43 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
 
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tbName
-                |(${ fields.map(f => f.rawSchema).mkString(",") })
-                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
-            s""""$tablePath" $carbonSchemaString) """)
+               |(${ fields.map(f => f.rawSchema).mkString(",") })
+               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+                s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+                s""""$tablePath" $carbonSchemaString) """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
             CarbonEnv.getInstance(sparkSession).carbonMetastore
-              .dropTable(catalog.storePath, identifier)(sparkSession)
+                .dropTable(catalog.storePath, identifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
-                         s"and Table name [$tbName] failed")
+                s"and Table name [$tbName] failed")
             throw e
         }
       }
 
       LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
     }
-
     Seq.empty
   }
-
-  def setV(ref: Any, name: String, value: Any): Unit = {
-    ref.getClass.getFields.find(_.getName == name).get
-      .set(ref, value.asInstanceOf[AnyRef])
-  }
 }
 
 case class DeleteLoadsById(
     loadids: Seq[String],
     databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    tableName: String) extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
 
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        tableMeta.carbonTable
     CarbonStore.deleteLoadById(
       loadids,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -245,24 +274,24 @@ case class DeleteLoadsById(
       carbonTable
     )
     Seq.empty
-
   }
-
 }
 
 case class DeleteLoadsByLoadDate(
     databaseNameOp: Option[String],
     tableName: String,
     dateField: String,
-    loadDate: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+    loadDate: String) extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -271,7 +300,6 @@ case class DeleteLoadsByLoadDate(
     )
     Seq.empty
   }
-
 }
 
 object LoadTable {
@@ -318,10 +346,13 @@ object LoadTable {
 }
 
 case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
-  extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     val df = Dataset.ofRows(sparkSession, child)
     val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
     val load = LoadTable(
@@ -335,7 +366,7 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
     relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+        CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
     load
   }
 }
@@ -349,12 +380,10 @@ case class LoadTable(
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
-    updateModel: Option[UpdateTableModel] = None) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    updateModel: Option[UpdateTableModel] = None) extends RunnableCommand with DataProcessCommand {
 
-  private def getFinalOptions(carbonProperty: CarbonProperties): scala.collection
-  .mutable.Map[String, String] = {
+  private def getFinalOptions(carbonProperty: CarbonProperties):
+  scala.collection.mutable.Map[String, String] = {
     var optionsFinal = scala.collection.mutable.Map[String, String]()
     optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
     optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
@@ -415,14 +444,15 @@ case class LoadTable(
       case "false" =>
         // when single_pass = false  and if either alldictionarypath
         // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path").get) ||
-            StringUtils.isNotEmpty(optionsFinal.get("columndict").get)) {
+        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
+            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
           throw new MalformedCarbonCommandException(
             "Can not use all_dictionary_path or columndict without single_pass.")
         } else {
           false
         }
       case illegal =>
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
                      "Please set it as 'true' or 'false'")
         false
@@ -440,7 +470,12 @@ case class LoadTable(
   }
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    if (dataFrame.isDefined && !updateModel.isDefined) {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    if (dataFrame.isDefined && updateModel.isEmpty) {
       val rdd = dataFrame.get.rdd
       if (rdd.partitions == null || rdd.partitions.length == 0) {
         LOGGER.warn("DataLoading finished. No data was loaded.")
@@ -452,8 +487,9 @@ case class LoadTable(
     if (isOverwriteExist) {
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
+
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
@@ -467,10 +503,10 @@ case class LoadTable(
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val optionsFinal = getFinalOptions(carbonProperty)
     val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
-        .getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK
-      )
+        .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
+            .getCarbonTableIdentifier,
+          LockUsage.METADATA_LOCK
+        )
     try {
       // take lock only in case of normal data load.
       if (updateModel.isEmpty) {
@@ -499,21 +535,21 @@ case class LoadTable(
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 
       val partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+          relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+          relation.tableMeta.carbonTableIdentifier.getTableName + "/"
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-      val sort_scope = optionsFinal.get("sort_scope").get
-      val single_pass = optionsFinal.get("single_pass").get
-      val bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable").get
-      val bad_records_action = optionsFinal.get("bad_records_action").get
-      val bad_record_path = optionsFinal.get("bad_record_path").get
-      val global_sort_partitions = optionsFinal.get("global_sort_partitions").get
-      val dateFormat = optionsFinal.get("dateformat").get
-      val delimeter = optionsFinal.get("delimiter").get
-      val complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1").get
-      val complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2").get
-      val all_dictionary_path = optionsFinal.get("all_dictionary_path").get
-      val column_dict = optionsFinal.get("columndict").get
+      val sort_scope = optionsFinal("sort_scope")
+      val single_pass = optionsFinal("single_pass")
+      val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
+      val bad_records_action = optionsFinal("bad_records_action")
+      val bad_record_path = optionsFinal("bad_record_path")
+      val global_sort_partitions = optionsFinal("global_sort_partitions")
+      val dateFormat = optionsFinal("dateformat")
+      val delimeter = optionsFinal("delimiter")
+      val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
+      val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
+      val all_dictionary_path = optionsFinal("all_dictionary_path")
+      val column_dict = optionsFinal("columndict")
       if (sort_scope.equals("GLOBAL_SORT") &&
           single_pass.equals("TRUE")) {
         sys.error("Global_Sort can't be used with single_pass flow")
@@ -531,13 +567,13 @@ case class LoadTable(
       carbonLoadModel.setBadRecordsLocation(bad_record_path)
 
       ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-      carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get, "\\"))
-      carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get, "\""))
-      carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get, "#"))
+      carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
+      carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
+      carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
 
       // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
       // we should use table schema to generate file header.
-      var fileHeader = optionsFinal.get("fileheader").get
+      var fileHeader = optionsFinal("fileheader")
       val headerOption = options.get("header")
       if (headerOption.isDefined) {
         // whether the csv file has file header
@@ -559,7 +595,7 @@ case class LoadTable(
             // generate file header
             if (fileHeader.isEmpty) {
               fileHeader = table.getCreateOrderColumn(table.getFactTableName)
-                .asScala.map(_.getColName).mkString(",")
+                  .asScala.map(_.getColName).mkString(",")
             }
         }
       }
@@ -572,21 +608,21 @@ case class LoadTable(
         CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
-        .setSerializationNullFormat(
-          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-          optionsFinal.get("serialization_null_format").get)
+          .setSerializationNullFormat(
+            TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
+                optionsFinal("serialization_null_format"))
       carbonLoadModel
-        .setBadRecordsLoggerEnable(
-          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
+          .setBadRecordsLoggerEnable(
+            TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
       carbonLoadModel
-        .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
+          .setBadRecordsAction(
+            TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
       carbonLoadModel
-        .setIsEmptyDataBadRecord(
-          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-          optionsFinal.get("is_empty_data_bad_record").get)
+          .setIsEmptyDataBadRecord(
+            DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+                optionsFinal("is_empty_data_bad_record"))
       carbonLoadModel.setSortScope(sort_scope)
-      carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb").get)
+      carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
       carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
       carbonLoadModel.setUseOnePass(single_pass.toBoolean)
       if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
@@ -614,7 +650,7 @@ case class LoadTable(
         carbonLoadModel.setDirectLoad(true)
         carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
         val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
-          optionsFinal.get("maxcolumns").get)
+          optionsFinal("maxcolumns"))
         carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
         val storePath = relation.tableMeta.storePath
@@ -629,7 +665,7 @@ case class LoadTable(
         }
         // Create table and metadata folders if not exist
         val carbonTablePath = CarbonStorePath
-          .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+            .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
         val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
         val fileType = FileFactory.getFileType(metadataDirectoryPath)
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
@@ -638,9 +674,9 @@ case class LoadTable(
         if (carbonLoadModel.getUseOnePass) {
           val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
           val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-            .getCarbonTableIdentifier
+              .getCarbonTableIdentifier
           val carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(storePath, carbonTableIdentifier)
+              .getCarbonTablePath(storePath, carbonTableIdentifier)
           val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
           val dimensions = carbonTable.getDimensionByTableName(
             carbonTable.getFactTableName).asScala.toArray
@@ -649,37 +685,37 @@ case class LoadTable(
             carbonLoadModel.initPredefDictMap()
             // generate predefined dictionary
             GlobalDictionaryUtil
-              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-                dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
+                .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+                  dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
           }
           if (!StringUtils.isEmpty(all_dictionary_path)) {
             carbonLoadModel.initPredefDictMap()
             GlobalDictionaryUtil
-              .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
-                carbonLoadModel,
-                storePath,
-                carbonTableIdentifier,
-                dictFolderPath,
-                dimensions,
-                all_dictionary_path)
+                .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+                  carbonLoadModel,
+                  storePath,
+                  carbonTableIdentifier,
+                  dictFolderPath,
+                  dimensions,
+                  all_dictionary_path)
           }
           // dictionaryServerClient dictionary generator
           val dictionaryServerPort = carbonProperty
-            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+              .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+                CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-            getConf.get("spark.driver.host")
+              getConf.get("spark.driver.host")
           carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
           // start dictionary server when use one pass load and dimension with DICTIONARY
           // encoding is present.
           val allDimensions = table.getAllDimensions.asScala.toList
           val createDictionary = allDimensions.exists {
             carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                               !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+                !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
           }
           val server: Option[DictionaryServer] = if (createDictionary) {
             val dictionaryServer = DictionaryServer
-              .getInstance(dictionaryServerPort.toInt)
+                .getInstance(dictionaryServerPort.toInt)
             carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
             sparkSession.sparkContext.addSparkListener(new SparkListener() {
               override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
@@ -707,22 +743,13 @@ case class LoadTable(
               CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
             // getting all fields except tupleId field as it is not required in the value
             var otherFields = fields.toSeq
-              .filter(field => !field.name
-                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-              .map(field => {
-                if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
-                  new Column(field.name
-                    .substring(0,
-                      field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
-                } else {
-
-                  new Column(field.name)
-                }
-              })
+                .filter(field => !field.name
+                    .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+                .map(field => new Column(field.name))
 
             // extract tupleId field which will be used as a key
             val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
+                .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
             // use dataFrameWithoutTupleId as dictionaryDataFrame
             val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
             otherFields = otherFields :+ segIdColumn
@@ -733,12 +760,11 @@ case class LoadTable(
             (dataFrame, dataFrame)
           }
 
-          GlobalDictionaryUtil
-            .generateGlobalDictionary(
-              sparkSession.sqlContext,
-              carbonLoadModel,
-              relation.tableMeta.storePath,
-              dictionaryDataFrame)
+          GlobalDictionaryUtil.generateGlobalDictionary(
+                sparkSession.sqlContext,
+                carbonLoadModel,
+                relation.tableMeta.storePath,
+                dictionaryDataFrame)
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
@@ -759,14 +785,14 @@ case class LoadTable(
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory
-              .getCarbonFile(partitionLocation, fileType)
+                .getCarbonFile(partitionLocation, fileType)
             CarbonUtil.deleteFoldersAndFiles(file)
           }
         } catch {
           case ex: Exception =>
             LOGGER.error(ex)
             LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
-                         "Problem deleting the partition folder")
+                "Problem deleting the partition folder")
             throw ex
         }
 
@@ -791,40 +817,19 @@ case class LoadTable(
   }
 }
 
-private[sql] case class DeleteLoadByDate(
+case class CleanFiles(
     databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    loadDate: String) {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    tableName: String) extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
-    CarbonStore.deleteLoadByDate(
-      loadDate,
-      getDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
-      carbonTable
-    )
-    Seq.empty
+    processData(sparkSession)
   }
-}
-
-case class CleanFiles(
-    databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation = catalog
-      .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
     val carbonTable = relation.tableMeta.carbonTable
     CarbonStore.cleanFiles(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -840,13 +845,17 @@ case class ShowLoads(
     databaseNameOp: Option[String],
     tableName: String,
     limit: Option[String],
-    override val output: Seq[Attribute]) extends RunnableCommand {
+    override val output: Seq[Attribute]) extends RunnableCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
-      tableMeta.carbonTable
+        lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+        tableMeta.carbonTable
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,
@@ -859,16 +868,20 @@ case class ShowLoads(
 case class CarbonDropTableCommand(ifExistsSet: Boolean,
     databaseNameOp: Option[String],
     tableName: String)
-  extends RunnableCommand {
+  extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+    processData(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val storePath = catalog.storePath
     catalog.checkSchemasModifiedTimeAndReloadTables()
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
@@ -878,7 +891,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
 
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .dropTable(storePath, identifier)(sparkSession)
+          .dropTable(catalog.storePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
@@ -889,29 +902,57 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
         if (unlocked) {
           logInfo("Table MetaData Unlocked Successfully")
         }
-        // deleting any remaining files.
-        val metadataFilePath = CarbonStorePath
-          .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
-        val fileType = FileFactory.getFileType(metadataFilePath)
-        if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-          val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-          CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
-        }
       }
     }
     Seq.empty
   }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // delete the table folder
+    val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val metadataFilePath = CarbonStorePath
+        .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath
+    val fileType = FileFactory.getFileType(metadataFilePath)
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+    }
+    Seq.empty
+  }
 }
 
 private[sql] case class DescribeCommandFormatted(
     child: SparkPlan,
     override val output: Seq[Attribute],
     tblIdentifier: TableIdentifier)
-  extends RunnableCommand {
+  extends RunnableCommand with SchemaProcessCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+    var results: Seq[(String, String, String)] =
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
+    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+      case (groupId, _) => groupId != -1
+    }.toSeq.sortBy(_._1)
+    val groups = groupedDimensions.map(colGroups => {
+      colGroups._2.map(dim => dim.getColName).mkString(", ")
+    })
+    var index = 1
+    groups.foreach { x =>
+      results = results :+ (s"Column Group $index", x, "")
+      index = index + 1
+    }
+    results
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+        .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
     val mapper = new ObjectMapper()
     val colProps = StringBuilder.newBuilder
     val dims = relation.metaData.dims.map(x => x.toLowerCase)
@@ -922,8 +963,8 @@ private[sql] case class DescribeCommandFormatted(
           relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
         if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) {
           colProps.append(fieldName).append(".")
-            .append(mapper.writeValueAsString(dimension.getColumnProperties))
-            .append(",")
+              .append(mapper.writeValueAsString(dimension.getColumnProperties))
+              .append(",")
         }
         if (dimension.hasEncoding(Encoding.DICTIONARY) &&
             !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -950,7 +991,7 @@ private[sql] case class DescribeCommandFormatted(
     }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
     results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
-      .getDatabaseName, "")
+        .getDatabaseName, "")
     )
     results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
     results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
@@ -964,34 +1005,17 @@ private[sql] case class DescribeCommandFormatted(
     }
     results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
       relation.tableMeta.carbonTableIdentifier.getTableName).asScala
-      .map(column => column).mkString(","), ""))
+        .map(column => column).mkString(","), ""))
     val dimension = carbonTable
-      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+        .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
     results ++= getColumnGroups(dimension.asScala.toList)
     if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
       results ++=
-      Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
-        .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
+          Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+              .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
     }
     results.map { case (name, dataType, comment) =>
       Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s")
     }
   }
-
-  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
-    var results: Seq[(String, String, String)] =
-      Seq(("", "", ""), ("##Column Group Information", "", ""))
-    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
-      case (groupId, _) => groupId != -1
-    }.toSeq.sortBy(_._1)
-    val groups = groupedDimensions.map(colGroups => {
-      colGroups._2.map(dim => dim.getColName).mkString(", ")
-    })
-    var index = 1
-    groups.foreach { x =>
-      results = results :+ (s"Column Group $index", x, "")
-      index = index + 1
-    }
-    results
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 048681c..549841b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -430,8 +430,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
       checkSchemasModifiedTimeAndReloadTables
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+
       val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
         tableIdentifier.table)
       metadataToBeRemoved match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 03d0bde..a8f92ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -163,22 +163,11 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     (sparkSession: SparkSession): Unit = {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
-    val fileType = FileFactory.getFileType(metadataFilePath)
-
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      // while drop we should refresh the schema modified time so that if any thing has changed
-      // in the other beeline need to update.
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-    }
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)


Mime
View raw message