carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [3/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration
Date Wed, 27 Sep 2017 06:34:06 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
new file mode 100644
index 0000000..897895a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessCommand, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.{CausedBy, FileUtils}
+
+import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
+
+case class LoadTableCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    factPathFromUser: String,
+    dimFilesPath: Seq[DataLoadTableFileMapping],
+    options: scala.collection.immutable.Map[String, String],
+    isOverwriteTable: Boolean,
+    var inputSqlString: String = null,
+    dataFrame: Option[DataFrame] = None,
+    updateModel: Option[UpdateTableModel] = None)
+  extends RunnableCommand with DataProcessCommand {
+
+  private def getFinalOptions(carbonProperty: CarbonProperties):
+  scala.collection.mutable.Map[String, String] = {
+    val optionsFinal = scala.collection.mutable.Map[String, String]()
+    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
+    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
+    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
+    optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
+    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
+    optionsFinal.put("columndict", options.getOrElse("columndict", null))
+    optionsFinal
+      .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
+    optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
+      carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
+    val badRecordActionValue = carbonProperty
+      .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+    optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
+      .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+        badRecordActionValue)))
+    optionsFinal
+      .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
+    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
+    optionsFinal
+      .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
+    optionsFinal
+      .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
+    optionsFinal.put("dateformat", options.getOrElse("dateformat",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+        CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
+
+    optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
+      carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
+
+    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
+
+    optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+          CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
+
+    optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+        carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
+
+    val useOnePass = options.getOrElse("single_pass",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
+      case "true" =>
+        true
+      case "false" =>
+        // when single_pass = false  and if either alldictionarypath
+        // or columnDict is configured the do not allow load
+        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
+            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
+          throw new MalformedCarbonCommandException(
+            "Can not use all_dictionary_path or columndict without single_pass.")
+        } else {
+          false
+        }
+      case illegal =>
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
+                     "Please set it as 'true' or 'false'")
+        false
+    }
+    optionsFinal.put("single_pass", useOnePass.toString)
+    optionsFinal
+  }
+
+  private def checkDefaultValue(value: String, default: String) = {
+    if (StringUtils.isEmpty(value)) {
+      default
+    } else {
+      value
+    }
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    if (dataFrame.isDefined && updateModel.isEmpty) {
+      val rdd = dataFrame.get.rdd
+      if (rdd.partitions == null || rdd.partitions.length == 0) {
+        LOGGER.warn("DataLoading finished. No data was loaded.")
+        return Seq.empty
+      }
+    }
+
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    if (null == relation.tableMeta.carbonTable) {
+      LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
+      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
+    }
+
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val optionsFinal = getFinalOptions(carbonProperty)
+
+    val tableProperties = relation.tableMeta.carbonTable.getTableInfo
+      .getFactTable.getTableProperties
+
+    optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+
+    try {
+      val factPath = if (dataFrame.isDefined) {
+        ""
+      } else {
+        FileUtils.getPaths(
+          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      }
+      val carbonLoadModel = new CarbonLoadModel()
+      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
+
+      val table = relation.tableMeta.carbonTable
+      carbonLoadModel.setTableName(table.getFactTableName)
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+
+      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+      val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+      val sort_scope = optionsFinal("sort_scope")
+      val single_pass = optionsFinal("single_pass")
+      val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
+      val bad_records_action = optionsFinal("bad_records_action")
+      val bad_record_path = optionsFinal("bad_record_path")
+      val global_sort_partitions = optionsFinal("global_sort_partitions")
+      val dateFormat = optionsFinal("dateformat")
+      val delimeter = optionsFinal("delimiter")
+      val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
+      val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
+      val all_dictionary_path = optionsFinal("all_dictionary_path")
+      val column_dict = optionsFinal("columndict")
+      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
+      ValidateUtil.validateSortScope(table, sort_scope)
+
+      if (bad_records_logger_enable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
+
+      ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
+      carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
+      carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
+      carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
+
+      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+      // we should use table schema to generate file header.
+      var fileHeader = optionsFinal("fileheader")
+      val headerOption = options.get("header")
+      if (headerOption.isDefined) {
+        // whether the csv file has file header
+        // the default value is true
+        val header = try {
+          headerOption.get.toBoolean
+        } catch {
+          case ex: IllegalArgumentException =>
+            throw new MalformedCarbonCommandException(
+              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+        }
+        if (header) {
+          if (fileHeader.nonEmpty) {
+            throw new MalformedCarbonCommandException(
+              "When 'header' option is true, 'fileheader' option is not required.")
+          }
+        } else {
+          if (fileHeader.isEmpty) {
+            fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+              .asScala.map(_.getColName).mkString(",")
+          }
+        }
+      }
+
+      carbonLoadModel.setDateFormat(dateFormat)
+      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
+        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
+        CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+      carbonLoadModel
+        .setSerializationNullFormat(
+          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
+          optionsFinal("serialization_null_format"))
+      carbonLoadModel
+        .setBadRecordsLoggerEnable(
+          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
+      carbonLoadModel
+        .setBadRecordsAction(
+          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
+      carbonLoadModel
+        .setIsEmptyDataBadRecord(
+          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+          optionsFinal("is_empty_data_bad_record"))
+      carbonLoadModel.setSortScope(sort_scope)
+      carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
+      carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
+      carbonLoadModel.setUseOnePass(single_pass.toBoolean)
+      if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+          complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+          delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+        sys.error(s"Field Delimiter & Complex types delimiter are same")
+      } else {
+        carbonLoadModel.setComplexDelimiterLevel1(
+          CarbonUtil.delimiterConverter(complex_delimeter_level1))
+        carbonLoadModel.setComplexDelimiterLevel2(
+          CarbonUtil.delimiterConverter(complex_delimeter_level2))
+      }
+      // set local dictionary path, and dictionary file extension
+      carbonLoadModel.setAllDictPath(all_dictionary_path)
+
+      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      try {
+        // First system has to partition the data first and then call the load data
+        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+        carbonLoadModel.setFactFilePath(factPath)
+        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
+        carbonLoadModel.setCsvHeader(fileHeader)
+        carbonLoadModel.setColDictFilePath(column_dict)
+        carbonLoadModel.setDirectLoad(true)
+        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+        val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
+          optionsFinal("maxcolumns"))
+        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
+        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
+        val storePath = relation.tableMeta.storePath
+        // add the start entry for the new load in the table status file
+        if (updateModel.isEmpty) {
+          CommonUtil.
+            readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
+        }
+        if (isOverwriteTable) {
+          LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+        }
+        if (null == carbonLoadModel.getLoadMetadataDetails) {
+          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+        }
+        if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
+            StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
+          LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
+          LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
+          carbonLoadModel.setUseOnePass(false)
+        }
+        // Create table and metadata folders if not exist
+        val carbonTablePath = CarbonStorePath
+          .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
+        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+        val fileType = FileFactory.getFileType(metadataDirectoryPath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+          FileFactory.mkdirs(metadataDirectoryPath, fileType)
+        }
+        if (carbonLoadModel.getUseOnePass) {
+          val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+          val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+            .getCarbonTableIdentifier
+          val carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(storePath, carbonTableIdentifier)
+          val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+          val dimensions = carbonTable.getDimensionByTableName(
+            carbonTable.getFactTableName).asScala.toArray
+          val colDictFilePath = carbonLoadModel.getColDictFilePath
+          if (!StringUtils.isEmpty(colDictFilePath)) {
+            carbonLoadModel.initPredefDictMap()
+            // generate predefined dictionary
+            GlobalDictionaryUtil
+              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+                dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
+          }
+          if (!StringUtils.isEmpty(all_dictionary_path)) {
+            carbonLoadModel.initPredefDictMap()
+            GlobalDictionaryUtil
+              .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+                carbonLoadModel,
+                storePath,
+                carbonTableIdentifier,
+                dictFolderPath,
+                dimensions,
+                all_dictionary_path)
+          }
+          // dictionaryServerClient dictionary generator
+          val dictionaryServerPort = carbonProperty
+            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+          val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+            getConf.get("spark.driver.host")
+          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+          // start dictionary server when use one pass load and dimension with DICTIONARY
+          // encoding is present.
+          val allDimensions = table.getAllDimensions.asScala.toList
+          val createDictionary = allDimensions.exists {
+            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                               !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+          }
+          val server: Option[DictionaryServer] = if (createDictionary) {
+            val dictionaryServer = DictionaryServer
+              .getInstance(dictionaryServerPort.toInt, carbonTable)
+            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            sparkSession.sparkContext.addSparkListener(new SparkListener() {
+              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+                dictionaryServer.shutdown()
+              }
+            })
+            Some(dictionaryServer)
+          } else {
+            None
+          }
+          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
+            columnar,
+            partitionStatus,
+            server,
+            isOverwriteTable,
+            dataFrame,
+            updateModel)
+        } else {
+          val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+            val fields = dataFrame.get.schema.fields
+            import org.apache.spark.sql.functions.udf
+            // extracting only segment from tupleId
+            val getSegIdUDF = udf((tupleId: String) =>
+              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+            // getting all fields except tupleId field as it is not required in the value
+            var otherFields = fields.toSeq
+              .filter(field => !field.name
+                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+              .map(field => new Column(field.name))
+
+            // extract tupleId field which will be used as a key
+            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+              as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+            // use dataFrameWithoutTupleId as dictionaryDataFrame
+            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+            otherFields = otherFields :+ segIdColumn
+            // use dataFrameWithTupleId as loadDataFrame
+            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+          } else {
+            (dataFrame, dataFrame)
+          }
+
+          GlobalDictionaryUtil.generateGlobalDictionary(
+            sparkSession.sqlContext,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
+            dictionaryDataFrame)
+          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
+            columnar,
+            partitionStatus,
+            None,
+            isOverwriteTable,
+            loadDataFrame,
+            updateModel)
+        }
+      } catch {
+        case CausedBy(ex: NoRetryException) =>
+          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+        case ex: Exception =>
+          LOGGER.error(ex)
+          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
+          throw ex
+      } finally {
+        // Once the data load is successful delete the unwanted partition files
+        try {
+          val fileType = FileFactory.getFileType(partitionLocation)
+          if (FileFactory.isFileExist(partitionLocation, fileType)) {
+            val file = FileFactory
+              .getCarbonFile(partitionLocation, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
+          }
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex)
+            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
+                         "Problem deleting the partition folder")
+            throw ex
+        }
+
+      }
+    } catch {
+      case dle: DataLoadingException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
+        throw dle
+      case mce: MalformedCarbonCommandException =>
+        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
+        throw mce
+    }
+    Seq.empty
+  }
+
+  private def updateTableMetadata(
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
+    val sparkSession = sqlContext.sparkSession
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+      model.table)
+
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    // read TableInfo
+    val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+    // modify TableInfo
+    val columns = tableInfo.getFact_table.getTable_columns
+    for (i <- 0 until columns.size) {
+      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+      }
+    }
+    val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+    entry.setTime_stamp(System.currentTimeMillis())
+
+    // write TableInfo
+    metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+      carbonTablePath.getCarbonTableIdentifier,
+      tableInfo, entry, carbonTablePath.getPath)(sparkSession)
+
+    // update the schema modified time
+    metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
+
+    // update CarbonDataLoadSchema
+    val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
+      model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
new file mode 100644
index 0000000..02c7023
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
+import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.spark.DeleteDelataResultImpl
+import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.spark.util.QueryPlanUtil
+
+object DeleteExecution {
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
+    if (tableIdentifier.size > 1) {
+      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
+    } else {
+      TableIdentifier(tableIdentifier(0), None)
+    }
+  }
+
+  def deleteDeltaExecution(
+      identifier: Seq[String],
+      sparkSession: SparkSession,
+      dataRdd: RDD[Row],
+      timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean,
+      executorErrors: ExecutionErrors
+  ): Boolean = {
+
+    var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null
+    val tableName = getTableIdentifier(identifier).table
+    val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+
+    val storeLocation = relation.tableMeta.storePath
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = new
+        AbsoluteTableIdentifier(storeLocation,
+          relation.tableMeta.carbonTableIdentifier)
+    val tablePath = CarbonStorePath.getCarbonTablePath(
+      storeLocation,
+      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val factPath = tablePath.getFactDir
+
+    val carbonTable = relation.tableMeta.carbonTable
+    var deleteStatus = true
+    val deleteRdd = if (isUpdateOperation) {
+      val schema =
+        org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+          org.apache.spark.sql.types.StringType)))
+      val rdd = dataRdd
+        .map(row => Row(row.get(row.fieldIndex(
+          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+      sparkSession.createDataFrame(rdd, schema).rdd
+      // sqlContext.createDataFrame(rdd, schema).rdd
+    } else {
+      dataRdd
+    }
+
+    val (carbonInputFormat, job) =
+      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+    val keyRdd = deleteRdd.map({ row =>
+      val tupleId: String = row
+        .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+      val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+      (key, row)
+    }).groupByKey()
+
+    // if no loads are present then no need to do anything.
+    if (keyRdd.partitions.length == 0) {
+      return true
+    }
+
+    val blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
+    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+    CarbonUpdateUtil
+      .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
+
+    val rowContRdd =
+      sparkSession.sparkContext.parallelize(
+        blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
+        keyRdd.partitions.length)
+
+    val rdd = rowContRdd.join(keyRdd)
+    res = rdd.mapPartitionsWithIndex(
+      (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
+        Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
+
+          var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]()
+          while (records.hasNext) {
+            val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+            result = result ++
+                     deleteDeltaFunc(index,
+                       key,
+                       groupedRows.toIterator,
+                       timestamp,
+                       rowCountDetailsVO)
+          }
+          result
+        }
+    ).collect()
+
+    // if no loads are present then no need to do anything.
+    if (res.isEmpty) {
+      return true
+    }
+
+    // update new status file
+    checkAndUpdateStatusFiles()
+
+    // all or none : update status file, only if complete delete opeartion is successfull.
+    def checkAndUpdateStatusFiles(): Unit = {
+      val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+      val segmentDetails = new util.HashSet[String]()
+      res.foreach(resultOfSeg => resultOfSeg.foreach(
+        resultOfBlock => {
+          if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) {
+            blockUpdateDetailsList.add(resultOfBlock._2._1)
+            segmentDetails.add(resultOfBlock._2._1.getSegmentName)
+            // if this block is invalid then decrement block count in map.
+            if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
+              CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+                blockMappingVO.getSegmentNumberOfBlockMapping)
+            }
+          }
+          else {
+            deleteStatus = false
+            // In case of failure , clean all related delete delta files
+            CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+            LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+            val errorMsg =
+              "Delete data operation is failed due to failure in creating delete delta file for " +
+              "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+              resultOfBlock._2._1.getBlockName
+            executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+            executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+            if (executorErrors.failureCauses == FailureCauses.NONE) {
+              executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+              executorErrors.errorMsg = errorMsg
+            }
+            LOGGER.error(errorMsg)
+            return
+          }
+        }
+      )
+      )
+
+      val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+        .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+
+
+      // this is delete flow so no need of putting timestamp in the status file.
+      if (CarbonUpdateUtil
+            .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
+          CarbonUpdateUtil
+            .updateTableMetadataStatus(segmentDetails,
+              carbonTable,
+              timestamp,
+              !isUpdateOperation,
+              listOfSegmentToBeMarkedDeleted)
+      ) {
+        LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
+        LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
+      }
+      else {
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        val errorMessage = "Delete data operation is failed due to failure " +
+                           "in table status updation."
+        LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+        LOGGER.error("Delete data operation is failed due to failure in table status updation.")
+        executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
+        executorErrors.errorMsg = errorMessage
+        // throw new Exception(errorMessage)
+      }
+    }
+
+    def deleteDeltaFunc(index: Int,
+        key: String,
+        iter: Iterator[Row],
+        timestamp: String,
+        rowCountDetailsVO: RowCountDetailsVO
+    ): Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
+
+      val result = new DeleteDelataResultImpl()
+      var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      // here key = segment/blockName
+      val blockName = CarbonUpdateUtil
+        .getBlockName(
+          CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
+      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
+      val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
+      val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] {
+        val segmentUpdateDetails = new SegmentUpdateDetails()
+        var TID = ""
+        var countOfRows = 0
+        try {
+          while (iter.hasNext) {
+            val oneRow = iter.next
+            TID = oneRow
+              .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
+            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
+            val blockletId = CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
+            val pageId = Integer.parseInt(CarbonUpdateUtil
+              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
+            // stop delete operation
+            if(!IsValidOffset) {
+              executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
+              executorErrors.errorMsg = "Multiple input rows matched for same row."
+              throw new MultipleMatchingException("Multiple input rows matched for same row.")
+            }
+            countOfRows = countOfRows + 1
+          }
+
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
+          val completeBlockName = CarbonTablePath
+            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
+                               CarbonCommonConstants.FACT_FILE_EXT)
+          val deleteDeletaPath = CarbonUpdateUtil
+            .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
+          val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
+            FileFactory.getFileType(deleteDeletaPath))
+
+
+
+          segmentUpdateDetails.setBlockName(blockName)
+          segmentUpdateDetails.setActualBlockName(completeBlockName)
+          segmentUpdateDetails.setSegmentName(segmentId)
+          segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
+          segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
+
+          val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
+          val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
+          segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
+          if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
+            segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+          }
+          else {
+            // write the delta file
+            carbonDeleteWriter.write(deleteDeltaBlockDetails)
+          }
+
+          deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+        } catch {
+          case e : MultipleMatchingException =>
+            LOGGER.audit(e.getMessage)
+            LOGGER.error(e.getMessage)
+          // dont throw exception here.
+          case e: Exception =>
+            val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
+            LOGGER.audit(errorMsg)
+            LOGGER.error(errorMsg + e.getMessage)
+            throw e
+        }
+
+
+        var finished = false
+
+        override def hasNext: Boolean = {
+          if (!finished) {
+            finished = true
+            finished
+          }
+          else {
+            !finished
+          }
+        }
+
+        override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = {
+          finished = true
+          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
+        }
+      }
+      resultIter
+    }
+
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
new file mode 100644
index 0000000..34daf4e
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.AlterTableModel
+import org.apache.spark.sql.execution.command.management.AlterTableCompactionCommand
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
+
+object HorizontalCompaction {
+
+  val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * The method does horizontal compaction. After Update and Delete completion
+   * tryHorizontal compaction will be called. In case this method is called after
+   * Update statement then Update Compaction followed by Delete Compaction will be
+   * processed whereas for tryHorizontalCompaction called after Delete statement
+   * then only Delete Compaction will be processed.
+   */
+  def tryHorizontalCompaction(
+      sparkSession: SparkSession,
+      carbonRelation: CarbonRelation,
+      isUpdateOperation: Boolean): Unit = {
+
+    if (!CarbonDataMergerUtil.isHorizontalCompactionEnabled) {
+      return
+    }
+
+    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+    val carbonTable = carbonRelation.tableMeta.carbonTable
+    val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+    val updateTimeStamp = System.currentTimeMillis()
+    // To make sure that update and delete timestamps are not same,
+    // required to commit to status metadata and cleanup
+    val deleteTimeStamp = updateTimeStamp + 1
+
+    // get the valid segments
+    var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Should avoid reading Table Status file from Disk every time. Better to load it
+    // in-memory at the starting and pass it along the routines. The constructor of
+    // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
+    // file and save the content in segmentDetails and updateDetails respectively.
+    val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
+      absTableIdentifier)
+
+    if (isUpdateOperation) {
+
+      // This is only update operation, perform only update compaction.
+      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      performUpdateDeltaCompaction(sparkSession,
+        compactionTypeIUD,
+        carbonTable,
+        absTableIdentifier,
+        segmentUpdateStatusManager,
+        updateTimeStamp,
+        segLists)
+    }
+
+    // After Update Compaction perform delete compaction
+    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
+    segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
+    if (segLists == null || segLists.size() == 0) {
+      return
+    }
+
+    // Delete Compaction
+    performDeleteDeltaCompaction(sparkSession,
+      compactionTypeIUD,
+      carbonTable,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      deleteTimeStamp,
+      segLists)
+  }
+
+  /**
+   * Update Delta Horizontal Compaction.
+   */
+  private def performUpdateDeltaCompaction(sparkSession: SparkSession,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    // get the valid segments qualified for update compaction.
+    val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (validSegList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
+    LOG.audit(s"Horizontal Update Compaction operation started for [$db.$table].")
+
+    try {
+      // Update Compaction.
+      val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
+        carbonTable.getFactTableName,
+        Some(segmentUpdateStatusManager),
+        CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+        Some(factTimeStamp),
+        "")
+
+      AlterTableCompactionCommand(alterTableModel).run(sparkSession)
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
+        }
+        throw new HorizontalCompactionException(
+          s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
+    }
+    LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
+    LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
+  }
+
+  /**
+   * Delete Delta Horizontal Compaction.
+   */
+  private def performDeleteDeltaCompaction(sparkSession: SparkSession,
+      compactionTypeIUD: CompactionType,
+      carbonTable: CarbonTable,
+      absTableIdentifier: AbsoluteTableIdentifier,
+      segmentUpdateStatusManager: SegmentUpdateStatusManager,
+      factTimeStamp: Long,
+      segLists: util.List[String]): Unit = {
+
+    val db = carbonTable.getDatabaseName
+    val table = carbonTable.getFactTableName
+    val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
+      absTableIdentifier,
+      segmentUpdateStatusManager,
+      compactionTypeIUD)
+
+    if (deletedBlocksList.size() == 0) {
+      return
+    }
+
+    LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].")
+    LOG.audit(s"Horizontal Delete Compaction operation started for [$db.$table].")
+
+    try {
+
+      // Delete Compaction RDD
+      val rdd1 = sparkSession.sparkContext
+        .parallelize(deletedBlocksList.asScala, deletedBlocksList.size())
+
+      val timestamp = factTimeStamp
+      val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
+      val result = rdd1.mapPartitions(iter =>
+        new Iterator[Seq[CarbonDataMergerUtilResult]] {
+          override def hasNext: Boolean = iter.hasNext
+
+          override def next(): Seq[CarbonDataMergerUtilResult] = {
+            val segmentAndBlocks = iter.next
+            val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/"))
+            val blockName = segmentAndBlocks
+              .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
+
+            val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
+              absTableIdentifier,
+              updateStatusDetails,
+              timestamp)
+
+            result.asScala.toList
+
+          }
+        }).collect
+
+      val resultList = ListBuffer[CarbonDataMergerUtilResult]()
+      result.foreach(x => {
+        x.foreach(y => {
+          resultList += y
+        })
+      })
+
+      val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
+        carbonTable,
+        timestamp.toString,
+        segmentUpdateStatusManager)
+      if (updateStatus == false) {
+        LOG.audit(s"Delete Compaction data operation is failed for [$db.$table].")
+        LOG.error("Delete Compaction data operation is failed.")
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [$db.$table] ." +
+          s" Please check logs for more info.", factTimeStamp)
+      }
+      else {
+        LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].")
+        LOG.audit(s"Horizontal Delete Compaction operation completed for [$db.$table].")
+      }
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
+        }
+        throw new HorizontalCompactionException(
+          s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompactionException.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompactionException.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompactionException.scala
new file mode 100644
index 0000000..60ae078
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompactionException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+class HorizontalCompactionException(
+    message: String,
+    // required for cleanup
+    val compactionTimeStamp: Long) extends RuntimeException(message) {
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
new file mode 100644
index 0000000..9cf8a91
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.spark.load.FailureCauses
+
+/**
+ * IUD update delete and compaction framework.
+ *
+ */
+private[sql] case class ProjectForDeleteCommand(
+    plan: LogicalPlan,
+    identifier: Seq[String],
+    timestamp: String) extends RunnableCommand with DataProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val dataFrame = Dataset.ofRows(sparkSession, plan)
+    //    dataFrame.show(truncate = false)
+    //    dataFrame.collect().foreach(println)
+    val dataRdd = dataFrame.rdd
+
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      LOGGER.audit(s" Delete data request has been received " +
+                   s"for ${ relation.databaseName }.${ relation.tableName }.")
+      if (lockStatus) {
+        LOGGER.info("Successfully able to get the table metadata file lock")
+      } else {
+        throw new Exception("Table is locked for deletion. Please try after some time")
+      }
+      val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
+
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+      if (DeleteExecution
+        .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
+          isUpdateOperation = false, executorErrors)) {
+        // call IUD Compaction.
+        HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
+          isUpdateOperation = false)
+      }
+    } catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
+                     " Please check logs. " + e.getMessage)
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+
+        // clean up. Null check is required as for executor error some times message is null
+        if (null != e.getMessage) {
+          sys.error("Delete data operation is failed. " + e.getMessage)
+        }
+        else {
+          sys.error("Delete data operation is failed. Please check logs.")
+        }
+    } finally {
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
new file mode 100644
index 0000000..036ca49
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.spark.load.FailureCauses
+
+private[sql] case class ProjectForUpdateCommand(
+    plan: LogicalPlan, tableIdentifier: Seq[String])
+  extends RunnableCommand with DataProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
+
+    //  sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
+    //  .EXECUTION_ID_KEY, null)
+    // DataFrame(sqlContext, plan).show(truncate = false)
+    // return Seq.empty
+
+
+    val res = plan find {
+      case relation: LogicalRelation if relation.relation
+        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        true
+      case _ => false
+    }
+
+    if (res.isEmpty) {
+      return Seq.empty
+    }
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
+      asInstanceOf[CarbonRelation]
+    //    val relation = CarbonEnv.get.carbonMetastore
+    //      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
+    //      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val metadataLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK)
+    var lockStatus = false
+    // get the current time stamp which should be same for delete and update.
+    val currentTime = CarbonUpdateUtil.readCurrentTime
+    //    var dataFrame: DataFrame = null
+    var dataSet: DataFrame = null
+    var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
+    try {
+      lockStatus = metadataLock.lockWithRetries()
+      if (lockStatus) {
+        logInfo("Successfully able to get the table metadata file lock")
+      }
+      else {
+        throw new Exception("Table is locked for updation. Please try after some time")
+      }
+      val tablePath = CarbonStorePath.getCarbonTablePath(
+        carbonTable.getStorePath,
+        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
+      // Get RDD.
+
+      dataSet = if (isPersistEnabled) {
+        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
+          CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
+      }
+      else {
+        Dataset.ofRows(sparkSession, plan)
+      }
+      var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+
+
+      // handle the clean up of IUD.
+      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
+
+      // do delete operation.
+      DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
+        currentTime + "",
+        relation, isUpdateOperation = true, executionErrors)
+
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
+
+      // do update operation.
+      performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
+
+      if(executionErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executionErrors.errorMsg)
+      }
+
+      // Do IUD Compaction.
+      HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+    } catch {
+      case e: HorizontalCompactionException =>
+        LOGGER.error(
+          "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
+        // In case of failure , clean all related delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+
+      case e: Exception =>
+        LOGGER.error("Exception in update operation" + e)
+        // ****** start clean up.
+        // In case of failure , clean all related delete delta files
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
+
+        // *****end clean up.
+        if (null != e.getMessage) {
+          sys.error("Update operation failed. " + e.getMessage)
+        }
+        if (null != e.getCause && null != e.getCause.getMessage) {
+          sys.error("Update operation failed. " + e.getCause.getMessage)
+        }
+        sys.error("Update operation failed. please check logs.")
+    }
+    finally {
+      if (null != dataSet && isPersistEnabled) {
+        dataSet.unpersist()
+      }
+      if (lockStatus) {
+        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
+      }
+    }
+    Seq.empty
+  }
+
+  private def performUpdate(
+      dataFrame: Dataset[Row],
+      tableIdentifier: Seq[String],
+      plan: LogicalPlan,
+      sparkSession: SparkSession,
+      currentTime: Long,
+      executorErrors: ExecutionErrors): Unit = {
+
+    def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
+
+      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
+      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
+      (tableIdentifier.size > 1 &&
+       tableIdentifier(0) == dbName &&
+       tableIdentifier(1) == tableName) ||
+      (tableIdentifier(0) == tableName)
+    }
+    def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
+      var header = ""
+      var found = false
+
+      plan match {
+        case Project(pList, _) if (!found) =>
+          found = true
+          header = pList
+            .filter(field => !field.name
+              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+            .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
+              col.name
+                .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+            }
+            else {
+              col.name
+            }).mkString(",")
+      }
+      header
+    }
+    val ex = dataFrame.queryExecution.analyzed
+    val res = ex find {
+      case relation: LogicalRelation
+        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
+        true
+      case _ => false
+    }
+    val carbonRelation: CarbonDatasourceHadoopRelation = res match {
+      case Some(relation: LogicalRelation) =>
+        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      case _ => sys.error("")
+    }
+
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+
+    val header = getHeader(carbonRelation, plan)
+
+    LoadTableCommand(
+      Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
+      carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
+      null,
+      Seq(),
+      Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(dataFrame),
+      Some(updateTableModel)).run(sparkSession)
+
+    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
+    executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
+
+    Seq.empty
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
new file mode 100644
index 0000000..07ef555
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+object Checker {
+  def validateTableExists(
+      dbName: Option[String],
+      tableName: String,
+      session: SparkSession): Unit = {
+    val identifier = TableIdentifier(tableName, dbName)
+    if (!CarbonEnv.getInstance(session).carbonMetastore.tableExists(identifier)(session)) {
+      val err = s"table $dbName.$tableName not found"
+      LogServiceFactory.getLogService(this.getClass.getName).error(err)
+      throw new IllegalArgumentException(err)
+    }
+  }
+}
+
+/**
+ * Interface for command that modifies schema
+ */
+trait SchemaProcessCommand {
+  def processSchema(sparkSession: SparkSession): Seq[Row]
+}
+
+/**
+ * Interface for command that need to process data in file system
+ */
+trait DataProcessCommand {
+  def processData(sparkSession: SparkSession): Seq[Row]
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
new file mode 100644
index 0000000..1f06aed
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+case class AlterTableDropCarbonPartitionCommand(
+    model: AlterTableDropPartitionModel)
+  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    if (model.partitionId.equals("0")) {
+      sys.error(s"Cannot drop default partition! Please use delete statement!")
+    }
+    processSchema(sparkSession)
+    processData(sparkSession)
+    Seq.empty
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = model.tableName
+    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+    val storePath = relation.tableMeta.storePath
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    val table = relation.tableMeta.carbonTable
+    val partitionInfo = table.getPartitionInfo(tableName)
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+    val partitionIndex = partitionIds.indexOf(Integer.valueOf(model.partitionId))
+    partitionInfo.getPartitionType match {
+      case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
+      case PartitionType.RANGE =>
+        val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
+        val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
+        rangeInfo.remove(rangeToRemove)
+        partitionInfo.setRangeInfo(rangeInfo)
+      case PartitionType.LIST =>
+        val listInfo = new util.ArrayList(partitionInfo.getListInfo)
+        val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
+        listInfo.remove(listToRemove)
+        partitionInfo.setListInfo(listInfo)
+      case PartitionType.RANGE_INTERVAL =>
+        sys.error(s"Dropping range interval partition isn't support yet!")
+    }
+    partitionInfo.dropPartition(partitionIndex)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, storePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+    thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, storePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    // sparkSession.catalog.refreshTable(tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = model.tableName
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+      val table = relation.tableMeta.carbonTable
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      CarbonDataRDDFactory.alterTableDropPartition(
+        sparkSession.sqlContext,
+        model.partitionId,
+        carbonLoadModel,
+        model.dropWithData,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
+        success = false
+    } finally {
+      CacheProvider.getInstance().dropAllCache()
+      AlterTableUtil.releaseLocks(locks)
+      LOGGER.info("Locks released after alter table drop partition action.")
+      LOGGER.audit("Locks released after alter table drop partition action.")
+    }
+    LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
+    LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+    Seq.empty
+  }
+}


Mime
View raw message