carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/3] carbondata git commit: [CARBONDATA-940] alter table add/split partition for spark 2.1
Date Thu, 17 Aug 2017 01:29:19 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 3949404..184ab9e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -16,8 +16,28 @@
  */
 package org.apache.spark.util
 
+import java.io.{File, IOException}
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.CommonUtil
+
 object PartitionUtils {
 
   def getListInfo(originListInfo: String): List[List[String]] = {
@@ -49,4 +69,119 @@ object PartitionUtils {
     }
     listInfo.toList
   }
+
+  /**
+   * verify the add/split information and update the partitionInfo:
+   *  1. update rangeInfo/listInfo
+   *  2. update partitionIds
+   */
+  def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIdList: List[Int],
+      partitionId: Int, splitInfo: List[String], timestampFormatter: SimpleDateFormat,
+      dateFormatter: SimpleDateFormat): Unit = {
+    val columnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
+    val index = partitionIdList.indexOf(partitionId)
+    if (partitionInfo.getPartitionType == PartitionType.RANGE) {
+      val rangeInfo = partitionInfo.getRangeInfo.asScala.toList
+      val newRangeInfo = partitionId match {
+        case 0 => rangeInfo ++ splitInfo
+        case _ => rangeInfo.take(index - 1) ++ splitInfo ++
+                  rangeInfo.takeRight(rangeInfo.size - index)
+      }
+      CommonUtil.validateRangeInfo(newRangeInfo, columnDataType,
+        timestampFormatter, dateFormatter)
+      partitionInfo.setRangeInfo(newRangeInfo.asJava)
+    } else if (partitionInfo.getPartitionType == PartitionType.LIST) {
+      val originList = partitionInfo.getListInfo.asScala.map(_.asScala.toList).toList
+      if (partitionId != 0) {
+        val targetListInfo = partitionInfo.getListInfo.get(index - 1)
+        CommonUtil.validateSplitListInfo(targetListInfo.asScala.toList, splitInfo, originList)
+      } else {
+        CommonUtil.validateAddListInfo(splitInfo, originList)
+      }
+      val addListInfo = PartitionUtils.getListInfo(splitInfo.mkString(","))
+      val newListInfo = partitionId match {
+        case 0 => originList ++ addListInfo
+        case _ => originList.take(index - 1) ++ addListInfo ++
+                  originList.takeRight(originList.size - index)
+      }
+      partitionInfo.setListInfo(newListInfo.map(_.asJava).asJava)
+    }
+
+    if (partitionId == 0) {
+      partitionInfo.addPartition(splitInfo.size)
+    } else {
+      partitionInfo.splitPartition(index, splitInfo.size)
+    }
+  }
+
+  /**
+   * Used for alter table partition commands to get segmentProperties in spark node
+   * @param identifier
+   * @param segmentId
+   * @param oldPartitionIdList   Task id group before partition info is changed
+   * @return
+   */
+  def getSegmentProperties(identifier: AbsoluteTableIdentifier, segmentId: String,
+      partitionIds: List[String], oldPartitionIdList: List[Int],
+      partitionInfo: PartitionInfo): SegmentProperties = {
+    val tableBlockInfoList =
+      getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList, partitionInfo)
+    val footer = CarbonUtil.readMetadatFile(tableBlockInfoList.get(0))
+    val segmentProperties = new SegmentProperties(footer.getColumnInTable,
+      footer.getSegmentInfo.getColumnCardinality)
+    segmentProperties
+  }
+
+  def getPartitionBlockList(identifier: AbsoluteTableIdentifier, segmentId: String,
+      partitionIds: List[String], oldPartitionIdList: List[Int],
+      partitionInfo: PartitionInfo): java.util.List[TableBlockInfo] = {
+    val jobConf = new JobConf(new Configuration)
+    val job = new Job(jobConf)
+    val format = CarbonInputFormatUtil
+      .createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
+    val splits = format.getSplitsOfOneSegment(job, segmentId,
+      oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
+    val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+    val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava)
+    tableBlockInfoList
+  }
+
+  @throws(classOf[IOException])
+  def deleteOriginalCarbonFile(identifier: AbsoluteTableIdentifier, segmentId: String,
+      partitionIds: List[String], oldPartitionIdList: List[Int], storePath: String,
+      dbName: String, tableName: String, partitionInfo: PartitionInfo,
+      carbonLoadModel: CarbonLoadModel): Unit = {
+    val newTime = carbonLoadModel.getFactTimeStamp
+    val tableBlockInfoList =
+      getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList,
+        partitionInfo).asScala
+    val pathList: util.List[String] = new util.ArrayList[String]()
+    val carbonTablePath = new CarbonTablePath(storePath, dbName, tableName)
+    tableBlockInfoList.foreach{ tableBlockInfo =>
+      val path = tableBlockInfo.getFilePath
+      val timestamp = CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path)
+      if (timestamp.toLong != newTime) {
+        // add carbondata file
+        pathList.add(path)
+        // add index file
+        val version = tableBlockInfo.getVersion
+        val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path)
+        val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
+        val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
+        val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
+        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0",
+          segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version)
+        // indexFilePath could be duplicated when multiple data file related to one index file
+        if (indexFilePath != null && !pathList.contains(indexFilePath)) {
+          pathList.add(indexFilePath)
+        }
+      }
+    }
+    val files: util.List[File] = new util.ArrayList[File]()
+    for (path <- pathList.asScala) {
+      val file = new File(path)
+      files.add(file)
+    }
+    CarbonUtil.deleteFiles(files.asScala.toArray)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5ede835..f556a05 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
@@ -47,23 +47,23 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
 import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load.{FailureCauses, _}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
@@ -178,6 +178,26 @@ object CarbonDataRDDFactory {
     }
   }
 
+  def alterTableSplitPartition(sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    LOGGER.audit(s"Add partition request received for table " +
+         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    try {
+      startSplitThreads(sqlContext,
+        carbonLoadModel,
+        storePath,
+        partitionId,
+        oldPartitionIdList)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
+        throw e
+    }
+  }
+
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
@@ -345,6 +365,73 @@ object CarbonDataRDDFactory {
     compactionThread.run()
   }
 
+  case class SplitThread(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      storePath: String,
+      segmentId: String,
+      partitionId: String,
+      oldPartitionIdList: List[Int]) extends Thread {
+      override def run(): Unit = {
+        var triggeredSplitPartitionStatus = false
+        var exception: Exception = null
+        try {
+          DataManagementFunc.executePartitionSplit(sqlContext,
+            carbonLoadModel, executor, storePath, segmentId, partitionId,
+            oldPartitionIdList)
+          triggeredSplitPartitionStatus = true
+        } catch {
+          case e: Exception =>
+            LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
+          exception = e
+        }
+        if (triggeredSplitPartitionStatus == false) {
+          throw new Exception("Exception in split partition " + exception.getMessage)
+        }
+      }
+  }
+
+  def startSplitThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
+      var i = 0
+      validSegments.foreach { segmentId =>
+        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, storePath,
+          segmentId, partitionId, oldPartitionIdList)
+        threadArray(i).start()
+        i += 1
+      }
+      threadArray.foreach {
+        thread => thread.join()
+      }
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
+      throw e
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+
   def loadCarbonData(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
@@ -946,7 +1033,7 @@ object CarbonDataRDDFactory {
   }
 
   /**
-   * repartition the input data for partiton table.
+   * repartition the input data for partition table.
    * @param sqlContext
    * @param dataFrame
    * @param carbonLoadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 4711618..9cc12cd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.text.SimpleDateFormat
+import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
@@ -29,20 +32,24 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.FileUtils
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.util.{AlterTableUtil, FileUtils, PartitionUtils}
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -183,6 +190,136 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
   }
 }
 
+/**
+ * Command for Alter Table Add & Split partition
+ * Add is a special case of Splitting the default partition (part0)
+ * @param splitPartitionModel
+ */
+case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitPartitionModel)
+  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  val tableName = splitPartitionModel.tableName
+  val splitInfo = splitPartitionModel.splitInfo
+  val partitionId = splitPartitionModel.partitionId.toInt
+  var partitionInfo: PartitionInfo = null
+  var carbonMetaStore: CarbonMetaStore = null
+  var relation: CarbonRelation = null
+  var dbName: String = null
+  var storePath: String = null
+  var table: CarbonTable = null
+  var carbonTableIdentifier: CarbonTableIdentifier = null
+  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+  val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+  val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+    .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+  val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+    LockUsage.COMPACTION_LOCK,
+    LockUsage.DELETE_SEGMENT_LOCK,
+    LockUsage.DROP_TABLE_LOCK,
+    LockUsage.CLEAN_FILES_LOCK,
+    LockUsage.ALTER_PARTITION_LOCK)
+
+  // TODO will add rollback function incase process data failure
+  def run(sparkSession: SparkSession): Seq[Row] = {
+      processSchema(sparkSession)
+      processData(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    dbName = splitPartitionModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+    storePath = relation.tableMeta.storePath
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    table = relation.tableMeta.carbonTable
+    partitionInfo = table.getPartitionInfo(tableName)
+    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    if (partitionInfo.getPartitionType == PartitionType.HASH) {
+      sys.error(s"Hash partition table cannot be added or split!")
+    }
+    PartitionUtils.updatePartitionInfo(partitionInfo, partitionIds, partitionId,
+      splitInfo, timestampFormatter, dateFormatter)
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, storePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, storePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    sparkSession.catalog.refreshTable(tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(storePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext,
+        partitionId.toString,
+        carbonLoadModel,
+        relation.tableMeta.storePath,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        success = false
+        sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
+    } finally {
+      AlterTableUtil.releaseLocks(locks)
+      CacheProvider.getInstance().dropAllCache()
+      LOGGER.info("Locks released after alter table add/split partition action.")
+      if (success) {
+        LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
+        LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+      }
+    }
+    Seq.empty
+  }
+}
+
 case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand
     with SchemaProcessCommand {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 511a61c..780839a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -63,7 +63,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    loadManagement| showLoads | alterTable | restructure | updateTable | deleteRecords
+    loadManagement|showLoads|alterTable|restructure|updateTable|deleteRecords|alterPartition
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -71,6 +71,30 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val restructure: Parser[LogicalPlan] =
     alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
 
+  protected lazy val alterPartition: Parser[LogicalPlan] =
+    alterAddPartition | alterSplitPartition
+
+  protected lazy val alterAddPartition: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
+      "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ {
+      case dbName ~ table ~ addInfo =>
+        val alterTableAddPartitionModel =
+          AlterTableSplitPartitionModel(dbName, table, "0", addInfo)
+        AlterTableSplitPartitionCommand(alterTableAddPartitionModel)
+    }
+
+  protected lazy val alterSplitPartition: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (SPLIT ~> PARTITION ~>
+       "(" ~> numericLit <~ ")") ~ (INTO ~> "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ {
+      case dbName ~ table ~ partitionId ~ splitInfo =>
+        val alterTableSplitPartitionModel =
+          AlterTableSplitPartitionModel(dbName, table, partitionId, splitInfo)
+        if (partitionId == 0) {
+          sys.error("Please use [Alter Table Add Partition] statement to split default partition!")
+        }
+        AlterTableSplitPartitionCommand(alterTableSplitPartitionModel)
+    }
+
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";")  ^^ {
       case dbName ~ table ~ (compact ~ compactType) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 41d6bd3..b4389a6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -123,7 +123,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       // validate partition clause
       if (partitionerFields.nonEmpty) {
         if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
-          throw new MalformedCarbonCommandException("Invalid partition definition")
+          throw new MalformedCarbonCommandException("Error: Invalid partition definition")
         }
         // partition columns should not be part of the schema
         val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
new file mode 100644
index 0000000..7d86468
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.partition
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    /**
+     * list_table_area_origin
+     * list_table_area
+     */
+    sql("""
+          | CREATE TABLE IF NOT EXISTS list_table_area_origin
+          | (
+          | id Int,
+          | vin string,
+          | logdate Timestamp,
+          | phonenumber Long,
+          | country string,
+          | salary Int
+          | )
+          | PARTITIONED BY (area string)
+          | STORED BY 'carbondata'
+          | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+          | 'LIST_INFO'='Asia, America, Europe')
+        """.stripMargin)
+    sql("""
+          | CREATE TABLE IF NOT EXISTS list_table_area
+          | (
+          | id Int,
+          | vin string,
+          | logdate Timestamp,
+          | phonenumber Long,
+          | country string,
+          | salary Int
+          | )
+          | PARTITIONED BY (area string)
+          | STORED BY 'carbondata'
+          | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+          | 'LIST_INFO'='Asia, America, Europe')
+        """.stripMargin)
+
+    /**
+     * range_table_logdate_origin
+     * range_table_logdate
+     */
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_logdate_origin
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_logdate
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01')
+      """.stripMargin)
+
+    /**
+     * list_table_country_origin
+     * list_table_country
+     */
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS list_table_country_origin
+        | (
+        | id Int,
+        | vin string,
+        | logdate Timestamp,
+        | phonenumber Long,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (country string)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS list_table_country
+        | (
+        | id Int,
+        | vin string,
+        | logdate Timestamp,
+        | phonenumber Long,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (country string)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ')
+      """.stripMargin)
+
+    /**
+     * range_table_logdate_split_origin
+     * range_table_logdate_split
+     */
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_logdate_split_origin
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_logdate_split
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01')
+      """.stripMargin)
+
+    /**
+     * range_table_bucket_origin
+     * range_table_bucket
+     */
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_bucket_origin
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01',
+        | 'BUCKETNUMBER'='3',
+        | 'BUCKETCOLUMNS'='country')
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE IF NOT EXISTS range_table_bucket
+        | (
+        | id Int,
+        | vin string,
+        | phonenumber Long,
+        | country string,
+        | area string,
+        | salary Int
+        | )
+        | PARTITIONED BY (logdate Timestamp)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01',
+        | 'BUCKETNUMBER'='3',
+        | 'BUCKETCOLUMNS'='country')
+      """.stripMargin)
+
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_area_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_country_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_split_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_bucket_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_area OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_country OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_split OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_bucket OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  test("Alter table add partition: List Partition") {
+    sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds = partitionInfo.getPartitionIds
+    val list_info = partitionInfo.getListInfo
+    assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo.getMAX_PARTITION == 5)
+    assert(partitionInfo.getNumPartitions == 6)
+    assert(list_info.get(0).get(0) == "Asia")
+    assert(list_info.get(1).get(0) == "America")
+    assert(list_info.get(2).get(0) == "Europe")
+    assert(list_info.get(3).get(0) == "OutSpace")
+    assert(list_info.get(4).get(0) == "Hi")
+    validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4))
+    val result_after = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area")
+    val result_origin = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin")
+    checkAnswer(result_after, result_origin)
+
+    val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ")
+    val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
+    checkAnswer(result_after1, rssult_origin1)
+
+    val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ")
+    val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ")
+    checkAnswer(result_after2, result_origin2)
+
+    val result_after3 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area = 'OutSpace' ")
+    val result_origin3 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area = 'OutSpace' ")
+    checkAnswer(result_after3, result_origin3)
+
+    val result_after4 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area > 'OutSpace' ")
+    val result_origin4 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area > 'OutSpace' ")
+    checkAnswer(result_after4, result_origin4)
+
+    val result_after5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area >= 'OutSpace' ")
+    val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ")
+    checkAnswer(result_after5, result_origin5)
+
+    sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin)
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds1 = partitionInfo1.getPartitionIds
+    val new_list_info = partitionInfo1.getListInfo
+    assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo1.getMAX_PARTITION == 8)
+    assert(partitionInfo1.getNumPartitions == 9)
+    assert(new_list_info.get(0).get(0) == "Asia")
+    assert(new_list_info.get(1).get(0) == "America")
+    assert(new_list_info.get(2).get(0) == "Europe")
+    assert(new_list_info.get(3).get(0) == "OutSpace")
+    assert(new_list_info.get(4).get(0) == "Hi")
+    assert(new_list_info.get(5).get(0) == "One")
+    assert(new_list_info.get(6).get(0) == "Two")
+    assert(new_list_info.get(6).get(1) == "Three")
+    assert(new_list_info.get(7).get(0) == "Four")
+    validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4))
+
+    val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area")
+    val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""")
+    checkAnswer(result_after6, result_origin6)
+  }
+
+  test("Alter table add partition: Range Partition") {
+    sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds = partitionInfo.getPartitionIds
+    val range_info = partitionInfo.getRangeInfo
+    assert(partitionIds.size() == 6)
+    assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo.getMAX_PARTITION == 5)
+    assert(range_info.get(0) == "2014/01/01")
+    assert(range_info.get(1) == "2015/01/01")
+    assert(range_info.get(2) == "2016/01/01")
+    assert(range_info.get(3) == "2017/01/01")
+    assert(range_info.get(4) == "2018/01/01")
+    validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 3, 4, 5))
+    val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""")
+    val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin""")
+    checkAnswer(result_after, result_origin)
+
+    val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after1, result_origin1)
+
+    val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after2, result_origin2)
+
+    val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after3, result_origin3)
+
+    val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after4, result_origin4)
+
+    val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after5, result_origin5)
+  }
+
+  test("Alter table split partition: List Partition") {
+    sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds = partitionInfo.getPartitionIds
+    val list_info = partitionInfo.getListInfo
+    assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo.getMAX_PARTITION == 8)
+    assert(partitionInfo.getNumPartitions == 8)
+    assert(list_info.get(0).get(0) == "China")
+    assert(list_info.get(0).get(1) == "US")
+    assert(list_info.get(1).get(0) == "UK")
+    assert(list_info.get(2).get(0) == "Japan")
+    assert(list_info.get(3).get(0) == "Canada")
+    assert(list_info.get(4).get(0) == "Russia")
+    assert(list_info.get(5).get(0) == "Good")
+    assert(list_info.get(5).get(1) == "NotGood")
+    assert(list_info.get(6).get(0) == "Korea")
+    validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8))
+    val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
+    val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
+    checkAnswer(result_after, result_origin)
+
+    val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country < 'NotGood' """)
+    val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country < 'NotGood' """)
+    checkAnswer(result_after1, result_origin1)
+
+    val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country <= 'NotGood' """)
+    val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country <= 'NotGood' """)
+    checkAnswer(result_after2, result_origin2)
+
+    val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country = 'NotGood' """)
+    val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country = 'NotGood' """)
+    checkAnswer(result_after3, result_origin3)
+
+    val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country >= 'NotGood' """)
+    val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country >= 'NotGood' """)
+    checkAnswer(result_after4, result_origin4)
+
+    val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """)
+    val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """)
+    checkAnswer(result_after5, result_origin5)
+  }
+
+  test("Alter table split partition: Range Partition") {
+    sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds = partitionInfo.getPartitionIds
+    val rangeInfo = partitionInfo.getRangeInfo
+    assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo.getMAX_PARTITION == 6)
+    assert(partitionInfo.getNumPartitions == 6)
+    assert(rangeInfo.get(0) == "2014/01/01")
+    assert(rangeInfo.get(1) == "2015/01/01")
+    assert(rangeInfo.get(2) == "2016/01/01")
+    assert(rangeInfo.get(3) == "2017/01/01")
+    assert(rangeInfo.get(4) == "2018/01/01")
+    validateDataFiles("default_range_table_logdate_split", "0", Seq(1, 2, 3, 5, 6))
+    val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""")
+    val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""")
+    checkAnswer(result_after, result_origin)
+
+    val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after1, result_origin1)
+
+    val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after2, result_origin2)
+
+    val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after3, result_origin3)
+
+    val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after4, result_origin4)
+
+    val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after5, result_origin5)
+  }
+
+  test("Alter table split partition: Range Partition + Bucket") {
+    sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds = partitionInfo.getPartitionIds
+    val rangeInfo = partitionInfo.getRangeInfo
+    assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo.getMAX_PARTITION == 6)
+    assert(partitionInfo.getNumPartitions == 6)
+    assert(rangeInfo.get(0) == "2014/01/01")
+    assert(rangeInfo.get(1) == "2015/01/01")
+    assert(rangeInfo.get(2) == "2016/01/01")
+    assert(rangeInfo.get(3) == "2017/01/01")
+    assert(rangeInfo.get(4) == "2018/01/01")
+    validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5, 6))
+    val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+    val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin""")
+    checkAnswer(result_after, result_origin)
+
+    val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after1, result_origin1)
+
+    val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after2, result_origin2)
+
+    val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_origin3, result_after3)
+
+    val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after4, result_origin4)
+
+    val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
+    checkAnswer(result_after5, result_origin5)
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val dataFiles = getDataFiles(carbonTable, segmentId)
+    validatePartitionTableFiles(partitions, dataFiles)
+  }
+
+  def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
+    val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName,
+      carbonTable.getFactTableName)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".carbondata")
+      }
+    })
+    dataFiles
+  }
+
+  /**
+   * should ensure answer equals to expected list, not only contains
+   * @param partitions
+   * @param dataFiles
+   */
+  def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = {
+    val partitionIds: ListBuffer[Int] = new ListBuffer[Int]()
+    dataFiles.foreach { dataFile =>
+      val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+      partitionIds += partitionId
+      assert(partitions.contains(partitionId))
+    }
+    partitions.foreach(id => assert(partitionIds.contains(id)))
+  }
+
+  override def afterAll = {
+    dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+  }
+
+  def dropTable {
+    sql("DROP TABLE IF EXISTS list_table_area_origin")
+    sql("DROP TABLE IF EXISTS range_table_logdate_origin")
+    sql("DROP TABLE IF EXISTS list_table_country_origin")
+    sql("DROP TABLE IF EXISTS range_table_logdate_split_origin")
+    sql("DROP TABLE IF EXISTS range_table_bucket_origin")
+    sql("DROP TABLE IF EXISTS list_table_area")
+    sql("DROP TABLE IF EXISTS range_table_logdate")
+    sql("DROP TABLE IF EXISTS list_table_country")
+    sql("DROP TABLE IF EXISTS range_table_logdate_split")
+    sql("DROP TABLE IF EXISTS range_table_bucket")
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 56aaf54..095e5a3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -402,7 +402,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private void initTempStoreLocation() {
     tempStoreLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName,
-            carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, true);
+            carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId,
+            true, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 518d64b..ccb25e6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -143,7 +143,7 @@ public final class DataLoadProcessBuilder {
     String tableName = loadModel.getTableName();
     String tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
-            loadModel.getTaskNo(), false);
+            loadModel.getTaskNo(), false, false);
     CarbonProperties.getInstance().addProperty(tempLocationKey,
         StringUtils.join(storeLocation, File.pathSeparator));
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 1aa06f6..c3cf3c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -72,7 +72,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
         CarbonDataProcessorUtil.getLocalDataFolderLocation(
             sortParameters.getDatabaseName(), sortParameters.getTableName(),
             String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
-            sortParameters.getSegmentId() + "", false);
+            sortParameters.getSegmentId() + "", false, false);
     // Set the data file location
     String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 7314b1e..851c384 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -137,7 +137,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
             String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false);
+            sortParameters.getSegmentId() + "", false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -188,7 +188,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(parameters.getDatabaseName(),
             parameters.getTableName(), parameters.getTaskNo(),
-            parameters.getPartitionID(), parameters.getSegmentId(), false);
+            parameters.getPartitionID(), parameters.getSegmentId(), false, false);
     String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(tmpLocs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index e140d86..ebb85b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -224,7 +224,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
           .getLocalDataFolderLocation(parameters.getDatabaseName(),
             parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
-            parameters.getSegmentId(), false);
+            parameters.getSegmentId(), false, false);
       String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
           File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
       parameters.setTempFileLocation(tempDirs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index e508654..f000619 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -127,7 +127,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
             String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false);
+            sortParameters.getSegmentId() + "", false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -170,7 +170,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
   private void setTempLocation(SortParameters parameters) {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false);
+            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+            false, false);
     String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(tmpLoc);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index fe9257f..765e0ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -91,7 +91,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
             tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false);
+            configuration.getSegmentId() + "", false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index c5f2479..fc4d4d2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -62,7 +62,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
             tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false);
+            configuration.getSegmentId() + "", false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index aad874b..c7af420 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -69,7 +69,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
             tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false);
+            configuration.getSegmentId() + "", false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 16cab07..fb2977e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -412,7 +412,7 @@ public class SortParameters implements Serializable {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
             tableIdentifier.getTableName(), configuration.getTaskNo(),
-            configuration.getPartitionId(), configuration.getSegmentId(), false);
+            configuration.getPartitionId(), configuration.getSegmentId(), false, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
 
@@ -534,7 +534,7 @@ public class SortParameters implements Serializable {
 
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
-            isCompactionFlow);
+            isCompactionFlow, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(sortTempDirs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
new file mode 100644
index 0000000..39d1234
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public abstract class AbstractCarbonQueryExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
+  protected CarbonTable carbonTable;
+  protected QueryModel queryModel;
+  protected QueryExecutor queryExecutor;
+  protected Map<String, TaskBlockInfo> segmentMapping;
+
+  /**
+   * get executor and execute the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+      throws QueryExecutionException, IOException {
+    queryModel.setTableBlockInfos(blockList);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    return queryExecutor.execute(queryModel);
+  }
+
+  /**
+   * Preparing of the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+    QueryModel model = new QueryModel();
+    model.setTableBlockInfos(blockList);
+    model.setForcedDetailRawQuery(true);
+    model.setFilterExpressionResolverTree(null);
+
+    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dim : dimensions) {
+      // check if dimension is deleted
+      QueryDimension queryDimension = new QueryDimension(dim.getColName());
+      queryDimension.setDimension(dim);
+      dims.add(queryDimension);
+    }
+    model.setQueryDimension(dims);
+
+    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    for (CarbonMeasure carbonMeasure : measures) {
+      // check if measure is deleted
+      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+      queryMeasure.setMeasure(carbonMeasure);
+      msrs.add(queryMeasure);
+    }
+    model.setQueryMeasures(msrs);
+    model.setQueryId(System.nanoTime() + "");
+    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+    model.setTable(carbonTable);
+    return model;
+  }
+
+  /**
+   * Below method will be used
+   * for cleanup
+   */
+  public void finish() {
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e, "Problem while finish: ");
+    }
+    clearDictionaryFromQueryModel();
+  }
+
+  /**
+   * This method will clear the dictionary access count after its usage is complete so
+   * that column can be deleted form LRU cache whenever memory reaches threshold
+   */
+  private void clearDictionaryFromQueryModel() {
+    if (null != queryModel) {
+      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+      if (null != columnToDictionaryMapping) {
+        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+          CarbonUtil.clearDictionaryCache(entry.getValue());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
new file mode 100644
index 0000000..7b724ee
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+
+/**
+ * Used to read carbon blocks when add/split partition
+ */
+public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName());
+
+  public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) {
+    this.segmentMapping = segmentMapping;
+    this.carbonTable = carbonTable;
+  }
+
+  public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
+      throws QueryExecutionException, IOException {
+    List<TableBlockInfo> list = null;
+    queryModel = prepareQueryModel(list);
+    List<PartitionSpliterRawResultIterator> resultList
+        = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
+    Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+    for (String task : taskBlockListMapping) {
+      list = taskBlockInfo.getTableBlockInfoList(task);
+      LOGGER.info("for task -" + task + "-block size is -" + list.size());
+      queryModel.setTableBlockInfos(list);
+      resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
+    }
+    return resultList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
new file mode 100644
index 0000000..ea38a53
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.spliter.exception.SliceSpliterException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultSpliterProcessor {
+
+  private CarbonFactHandler dataHandler;
+  private SegmentProperties segmentProperties;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName());
+
+
+  public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+      SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+    this.segmentProperties = segProp;
+    String tableName = carbonTable.getFactTableName();
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+        CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+            segProp, tableName, tempStoreLocation);
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+            loadModel.getFactTimeStamp());
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    carbonFactDataHandlerModel.setBucketId(bucketId);
+    //Note: set compaction flow just to convert decimal type
+    carbonFactDataHandlerModel.setCompactionFlow(true);
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+  }
+
+  public boolean execute(List<Object[]> resultList) {
+    boolean splitStatus;
+    boolean isDataPresent = false;
+
+    try {
+      if (!isDataPresent) {
+        dataHandler.initialise();
+        isDataPresent = true;
+      }
+      for (Object[] row: resultList) {
+        addRow(row);
+      }
+      if (isDataPresent)
+      {
+        this.dataHandler.finish();
+      }
+      splitStatus = true;
+    } catch (SliceSpliterException e) {
+      LOGGER.error(e, e.getMessage());
+      LOGGER.error("Exception in split partition" + e.getMessage());
+      splitStatus = false;
+    } finally {
+      try {
+        if (isDataPresent) {
+          this.dataHandler.closeHandler();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage());
+        splitStatus = false;
+      }
+    }
+    return splitStatus;
+  }
+
+  private void addRow(Object[] carbonTuple) throws SliceSpliterException {
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+    try {
+      this.dataHandler.addDataToStore(row);
+    } catch (CarbonDataWriterException e) {
+      throw new SliceSpliterException("Problem in writing rows when add/split the partition", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
new file mode 100644
index 0000000..17e679a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter.exception;
+
+import java.util.Locale;
+
+public class SliceSpliterException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceSpliterException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceSpliterException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}


Mime
View raw message