carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/4] incubator-carbondata git commit: Initial commit
Date Tue, 27 Dec 2016 01:19:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 21864d1..16e35f4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -17,13 +17,9 @@
 
 package org.apache.spark.sql
 
-import java.util.regex.{Matcher, Pattern}
-
 import scala.collection.JavaConverters._
-import scala.collection.mutable.LinkedHashSet
 import scala.collection.mutable.Map
 import scala.language.implicitConversions
-import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
@@ -31,154 +27,18 @@ import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveQlWrapper
 
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Parser for All Carbon DDL, DML cases in Unified context
  */
-class CarbonSqlParser() extends AbstractSparkSQLParser {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  protected val AGGREGATE = carbonKeyWord("AGGREGATE")
-  protected val AS = carbonKeyWord("AS")
-  protected val AGGREGATION = carbonKeyWord("AGGREGATION")
-  protected val ALL = carbonKeyWord("ALL")
-  protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
-  protected val BEFORE = carbonKeyWord("BEFORE")
-  protected val BY = carbonKeyWord("BY")
-  protected val CARDINALITY = carbonKeyWord("CARDINALITY")
-  protected val CASCADE = carbonKeyWord("CASCADE")
-  protected val CLASS = carbonKeyWord("CLASS")
-  protected val CLEAN = carbonKeyWord("CLEAN")
-  protected val COLS = carbonKeyWord("COLS")
-  protected val COLUMNS = carbonKeyWord("COLUMNS")
-  protected val CREATE = carbonKeyWord("CREATE")
-  protected val CUBE = carbonKeyWord("CUBE")
-  protected val CUBES = carbonKeyWord("CUBES")
-  protected val DATA = carbonKeyWord("DATA")
-  protected val DATABASE = carbonKeyWord("DATABASE")
-  protected val DATABASES = carbonKeyWord("DATABASES")
-  protected val DELETE = carbonKeyWord("DELETE")
-  protected val DELIMITER = carbonKeyWord("DELIMITER")
-  protected val DESCRIBE = carbonKeyWord("DESCRIBE")
-  protected val DESC = carbonKeyWord("DESC")
-  protected val DETAIL = carbonKeyWord("DETAIL")
-  protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
-  protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
-  protected val DROP = carbonKeyWord("DROP")
-  protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
-  protected val EXCLUDE = carbonKeyWord("EXCLUDE")
-  protected val EXPLAIN = carbonKeyWord("EXPLAIN")
-  protected val EXTENDED = carbonKeyWord("EXTENDED")
-  protected val FORMATTED = carbonKeyWord("FORMATTED")
-  protected val FACT = carbonKeyWord("FACT")
-  protected val FIELDS = carbonKeyWord("FIELDS")
-  protected val FILEHEADER = carbonKeyWord("FILEHEADER")
-  protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
-  protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
-  protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
-  protected val FILES = carbonKeyWord("FILES")
-  protected val FROM = carbonKeyWord("FROM")
-  protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
-  protected val IN = carbonKeyWord("IN")
-  protected val INCLUDE = carbonKeyWord("INCLUDE")
-  protected val INPATH = carbonKeyWord("INPATH")
-  protected val INTO = carbonKeyWord("INTO")
-  protected val LEVELS = carbonKeyWord("LEVELS")
-  protected val LIKE = carbonKeyWord("LIKE")
-  protected val LOAD = carbonKeyWord("LOAD")
-  protected val LOCAL = carbonKeyWord("LOCAL")
-  protected val MAPPED = carbonKeyWord("MAPPED")
-  protected val MEASURES = carbonKeyWord("MEASURES")
-  protected val MULTILINE = carbonKeyWord("MULTILINE")
-  protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
-  protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
-  protected val OPTIONS = carbonKeyWord("OPTIONS")
-  protected val OUTPATH = carbonKeyWord("OUTPATH")
-  protected val OVERWRITE = carbonKeyWord("OVERWRITE")
-  protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
-  protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
-  protected val PARTITIONER = carbonKeyWord("PARTITIONER")
-  protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
-  protected val RELATION = carbonKeyWord("RELATION")
-  protected val SCHEMA = carbonKeyWord("SCHEMA")
-  protected val SCHEMAS = carbonKeyWord("SCHEMAS")
-  protected val SHOW = carbonKeyWord("SHOW")
-  protected val TABLES = carbonKeyWord("TABLES")
-  protected val TABLE = carbonKeyWord("TABLE")
-  protected val TERMINATED = carbonKeyWord("TERMINATED")
-  protected val TYPE = carbonKeyWord("TYPE")
-  protected val USE = carbonKeyWord("USE")
-  protected val WHERE = carbonKeyWord("WHERE")
-  protected val WITH = carbonKeyWord("WITH")
-  protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
-  protected val ABS = carbonKeyWord("abs")
-
-  protected val FOR = carbonKeyWord("FOR")
-  protected val SCRIPTS = carbonKeyWord("SCRIPTS")
-  protected val USING = carbonKeyWord("USING")
-  protected val LIMIT = carbonKeyWord("LIMIT")
-  protected val DEFAULTS = carbonKeyWord("DEFAULTS")
-  protected val ALTER = carbonKeyWord("ALTER")
-  protected val ADD = carbonKeyWord("ADD")
-
-  protected val IF = carbonKeyWord("IF")
-  protected val NOT = carbonKeyWord("NOT")
-  protected val EXISTS = carbonKeyWord("EXISTS")
-  protected val DIMENSION = carbonKeyWord("DIMENSION")
-  protected val STARTTIME = carbonKeyWord("STARTTIME")
-  protected val SEGMENTS = carbonKeyWord("SEGMENTS")
-  protected val SEGMENT = carbonKeyWord("SEGMENT")
-
-  protected val STRING = carbonKeyWord("STRING")
-  protected val INTEGER = carbonKeyWord("INTEGER")
-  protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
-  protected val DATE = carbonKeyWord("DATE")
-  protected val CHAR = carbonKeyWord("CHAR")
-  protected val NUMERIC = carbonKeyWord("NUMERIC")
-  protected val DECIMAL = carbonKeyWord("DECIMAL")
-  protected val DOUBLE = carbonKeyWord("DOUBLE")
-  protected val SHORT = carbonKeyWord("SMALLINT")
-  protected val INT = carbonKeyWord("INT")
-  protected val BIGINT = carbonKeyWord("BIGINT")
-  protected val ARRAY = carbonKeyWord("ARRAY")
-  protected val STRUCT = carbonKeyWord("STRUCT")
-
-  protected val doubleQuotedString = "\"([^\"]+)\"".r
-  protected val singleQuotedString = "'([^']+)'".r
-
-  protected val newReservedWords =
-    this.getClass
-      .getMethods
-      .filter(_.getReturnType == classOf[Keyword])
-      .map(_.invoke(this).asInstanceOf[Keyword].str)
-
-  override val lexical = {
-    val sqllex = new SqlLexical()
-    sqllex.initialize(newReservedWords)
-    sqllex
-
-  }
-
-  import lexical.Identifier
-
-  implicit def regexToParser(regex: Regex): Parser[String] = {
-    acceptMatch(
-    s"identifier matching regex ${ regex }",
-    { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
-    )
-  }
+class CarbonSqlParser() extends CarbonDDLSqlParser {
 
   override def parse(input: String): LogicalPlan = {
     synchronized {
@@ -196,26 +56,14 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     }
   }
 
-  /**
-   * This will convert key word to regular expression.
-   *
-   * @param keys
-   * @return
-   */
-  private def carbonKeyWord(keys: String) = {
-    ("(?i)" + keys).r
-  }
-
   override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
-  protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
-                                                         loadManagement | describeTable |
-                                                         showLoads | alterTable | createTable
-
-  protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
-                                                           cleanFiles | loadDataNew
+  protected lazy val startCommand: Parser[LogicalPlan] =
+    createDatabase | dropDatabase | loadManagement | describeTable |
+    showLoads | alterTable | createTable
 
-  protected val escapedIdentifier = "`([^`]+)`".r
+  protected lazy val loadManagement: Parser[LogicalPlan] =
+    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
 
   protected lazy val createDatabase: Parser[LogicalPlan] =
     CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
@@ -253,19 +101,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         DropDatabase(dbName, isCascade, dropDbSql)
     }
 
-  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
-    var complexDimensions: Seq[Field] = Seq()
-    var dimensions: Seq[Field] = Seq()
-    dims.foreach { dimension =>
-      dimension.dataType.getOrElse("NIL") match {
-        case "Array" => complexDimensions = complexDimensions :+ dimension
-        case "Struct" => complexDimensions = complexDimensions :+ dimension
-        case _ => dimensions = dimensions :+ dimension
-      }
-    }
-    dimensions ++ complexDimensions
-  }
-
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> restInput ^^ {
       case statement =>
@@ -303,14 +138,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
       }
   }
 
-  private def getScaleAndPrecision(dataType: String): (Int, Int) = {
-    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
-    m.find()
-    val matchedString: String = m.group(1)
-    val scaleAndPrecision = matchedString.split(",")
-    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
-  }
-
   /**
    * This function will traverse the tree and logical plan will be formed using that.
    *
@@ -386,6 +213,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
                   if(f.dataType.getOrElse("").startsWith("char")) {
                     f.dataType = Some("char")
                   }
+                  f.rawSchema = x
                   fields ++= Seq(f)
                 }
               }
@@ -494,511 +322,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     }
   }
 
-  /**
-   * This will prepate the Model from the Tree details.
-   *
-   * @param ifNotExistPresent
-   * @param dbName
-   * @param tableName
-   * @param fields
-   * @param partitionCols
-   * @param tableProperties
-   * @return
-   */
-  protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-      , tableName: String, fields: Seq[Field],
-      partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]): TableModel
-  = {
-
-    fields.zipWithIndex.foreach { x =>
-      x._1.schemaOrdinal = x._2
-    }
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
-      fields, tableProperties)
-    if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-                                                +
-                                                " can not be created without key columns. Please " +
-                                                "use DICTIONARY_INCLUDE or " +
-                                                "DICTIONARY_EXCLUDE to set at least one key " +
-                                                "column " +
-                                                "if all specified columns are numeric types")
-    }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
-
-    // column properties
-    val colProps = extractColumnProperties(fields, tableProperties)
-    // get column groups configuration from table properties.
-    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
-      noDictionaryDims, msrs, dims)
-
-    // get no inverted index columns from table properties.
-    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
-
-    // validate the tableBlockSize from table properties
-    CommonUtil.validateTableBlockSize(tableProperties)
-
-    TableModel(
-      ifNotExistPresent,
-      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
-      dbName,
-      tableName,
-      tableProperties,
-      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
-      msrs.map(f => normalizeType(f)),
-      Option(noDictionaryDims),
-      Option(noInvertedIdxCols),
-      groupCols,
-      Some(colProps))
-  }
-
-  /**
-   * Extract the column groups configuration from table properties.
-   * Based on this Row groups of fields will be determined.
-   *
-   * @param tableProperties
-   * @return
-   */
-  protected def updateColumnGroupsInField(tableProperties: Map[String, String],
-      noDictionaryDims: Seq[String],
-      msrs: Seq[Field],
-      dims: Seq[Field]): Seq[String] = {
-    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
-
-      var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
-
-      // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
-      // here first splitting the value by () . so that the above will be splitted into 2 strings.
-      // [col1,col2] [col3,col4]
-      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
-      while (m.find()) {
-        val oneGroup: String = m.group(1)
-        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
-        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
-        splittedColGrps :+= arrangedColGrp
-      }
-      // This will  be furthur handled.
-      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
-    } else {
-      null
-    }
-  }
-
-  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
-    // if columns in column group is not in schema order than arrange it in schema order
-    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
-    colGroup.split(',').map(_.trim).foreach { x =>
-      dims.zipWithIndex.foreach { dim =>
-        if (dim._1.column.equalsIgnoreCase(x)) {
-          colGrpFieldIndx :+= dim._2
-        }
-      }
-    }
-    // sort it
-    colGrpFieldIndx = colGrpFieldIndx.sorted
-    // check if columns in column group is in schema order
-    if (!checkIfInSequence(colGrpFieldIndx)) {
-      throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
-    }
-    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
-      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
-        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
-          throw new MalformedCarbonCommandException(
-            "Invalid column group,column in group should be contiguous as per schema.")
-        }
-      }
-      true
-    }
-    val colGrpNames: StringBuilder = StringBuilder.newBuilder
-    for (i <- colGrpFieldIndx.indices) {
-      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
-      if (i < (colGrpFieldIndx.length - 1)) {
-        colGrpNames.append(",")
-      }
-    }
-    colGrpNames.toString()
-  }
-
-  /**
-   * For getting the partitioner Object
-   *
-   * @param partitionCols
-   * @param tableProperties
-   * @return
-   */
-  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]):
-  Option[Partitioner] = {
-
-    // by default setting partition class empty.
-    // later in table schema it is setting to default value.
-    var partitionClass: String = ""
-    var partitionCount: Int = 1
-    var partitionColNames: Array[String] = Array[String]()
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
-      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
-    }
-
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
-      try {
-        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
-      } catch {
-        case e: Exception => // no need to do anything.
-      }
-    }
-
-    partitionCols.foreach(col =>
-      partitionColNames :+= col.partitionColumn
-    )
-
-    // this means user has given partition cols list
-    if (!partitionColNames.isEmpty) {
-      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
-    }
-    // if partition cols are not given then no need to do partition.
-    None
-  }
-
-  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
-  java.util.Map[String, java.util.List[ColumnProperty]] = {
-    val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
-    fields.foreach { field =>
-      if (field.children.isDefined && field.children.get != null) {
-        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
-      } else {
-        fillColumnProperty(None, field.column, tableProperties, colPropMap)
-      }
-    }
-    colPropMap
-  }
-
-  protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
-      tableProperties: Map[String, String],
-      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
-    fieldChildren.foreach(fields => {
-      fields.foreach(field => {
-        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
-      }
-      )
-    }
-    )
-  }
-
-  protected def fillColumnProperty(parentColumnName: Option[String],
-      columnName: String,
-      tableProperties: Map[String, String],
-      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
-    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
-    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
-    if (colProps.isDefined) {
-      colPropMap.put(colProKey, colProps.get)
-    }
-  }
-
-  def getKey(parentColumnName: Option[String],
-      columnName: String): (String, String) = {
-    if (parentColumnName.isDefined) {
-      if (columnName == "val") {
-        (parentColumnName.get, parentColumnName.get + "." + columnName)
-      } else {
-        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
-      }
-    } else {
-      (columnName, columnName)
-    }
-  }
-
-  /**
-   * This will extract the no inverted columns fields.
-   * By default all dimensions use inverted index.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractNoInvertedIndexColumns(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  Seq[String] = {
-    // check whether the column name is in fields
-    var noInvertedIdxColsProps: Array[String] = Array[String]()
-    var noInvertedIdxCols: Seq[String] = Seq[String]()
-
-    if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
-      noInvertedIdxColsProps =
-        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
-      noInvertedIdxColsProps
-        .map { noInvertedIdxColProp =>
-          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
-            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
-                           " does not exist in table. Please check create table statement."
-            throw new MalformedCarbonCommandException(errormsg)
-          }
-        }
-    }
-    // check duplicate columns and only 1 col left
-    val distinctCols = noInvertedIdxColsProps.toSet
-    // extract the no inverted index columns
-    fields.foreach(field => {
-      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        noInvertedIdxCols :+= field.column
-      }
-    }
-    )
-    noInvertedIdxCols
-  }
-
-  /**
-   * This will extract the Dimensions and NoDictionary Dimensions fields.
-   * By default all string cols are dimensions.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
-    var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
-    var dictExcludeCols: Array[String] = Array[String]()
-    var noDictionaryDims: Seq[String] = Seq[String]()
-    var dictIncludeCols: Seq[String] = Seq[String]()
-
-    // All excluded cols should be there in create table cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-      dictExcludeCols
-        .map { dictExcludeCol =>
-          if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
-            val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
-                           " does not exist in table. Please check create table statement."
-            throw new MalformedCarbonCommandException(errormsg)
-          } else {
-            val dataType = fields.find(x =>
-              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
-            if (isComplexDimDictionaryExclude(dataType)) {
-              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
-                             dictExcludeCol
-              throw new MalformedCarbonCommandException(errormsg)
-            } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
-              val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
-                             " data type column: " + dictExcludeCol
-              throw new MalformedCarbonCommandException(errorMsg)
-            }
-          }
-        }
-    }
-    // All included cols should be there in create table cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
-      dictIncludeCols.map { distIncludeCol =>
-        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
-          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
-                         " does not exist in table. Please check create table statement."
-          throw new MalformedCarbonCommandException(errormsg)
-        }
-      }
-    }
-
-    // include cols should contain exclude cols
-    dictExcludeCols.foreach { dicExcludeCol =>
-      if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
-        val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
-                       " with DICTIONARY_INCLUDE. Please check create table statement."
-        throw new MalformedCarbonCommandException(errormsg)
-      }
-    }
-
-    // by default consider all String cols as dims and if any dictionary exclude is present then
-    // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
-    fields.foreach(field => {
-
-      if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
-        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
-          noDictionaryDims :+= field.column
-        }
-        dimFields += field
-      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        dimFields += (field)
-      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
-        dimFields += (field)
-      }
-    }
-    )
-
-    (dimFields.toSeq, noDictionaryDims)
-  }
-
-  /**
-   * It fills non string dimensions in dimFields
-   */
-  def fillNonStringDimension(dictIncludeCols: Seq[String],
-      field: Field, dimFields: LinkedHashSet[Field]) {
-    var dictInclude = false
-    if (dictIncludeCols.nonEmpty) {
-      dictIncludeCols.foreach(dictIncludeCol =>
-        if (field.column.equalsIgnoreCase(dictIncludeCol)) {
-          dictInclude = true
-        })
-    }
-    if (dictInclude) {
-      dimFields += field
-    }
-  }
-
-  /**
-   * detect dimention data type
-   *
-   * @param dimensionDatatype
-   */
-  def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
-    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
-    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
-  }
-
-  /**
-   * detects whether complex dimension is part of dictionary_exclude
-   */
-  def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
-    val dimensionType = Array("array", "struct")
-    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
-  }
-
-  /**
-   * detects whether double or decimal column is part of dictionary_exclude
-   */
-  def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
-    val dataTypes = Array("string", "timestamp")
-    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
-  }
-
-  /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
-    }
-
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
-        }
-      }
-    })
-
-    msrFields
-  }
-
-  /**
-   * Extract the DbName and table name.
-   *
-   * @param tableNameParts
-   * @return
-   */
-  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
-    val (db, tableName) =
-      tableNameParts.getChildren.asScala.map {
-        case Token(part, Nil) => cleanIdentifier(part)
-      } match {
-        case Seq(tableOnly) => (None, tableOnly)
-        case Seq(databaseName, table) => (Some(databaseName), table)
-      }
-
-    (db, tableName)
-  }
-
-  protected def cleanIdentifier(ident: String): String = {
-    ident match {
-      case escapedIdentifier(i) => i
-      case plainIdent => plainIdent
-    }
-  }
-
-  protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
-    var remainingNodes = nodeList
-    val clauses = clauseNames.map { clauseName =>
-      val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
-      remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
-        matches.tail
-      } else {
-        Nil
-      })
-      matches.headOption
-    }
-
-    if (remainingNodes.nonEmpty) {
-      sys.error(
-        s"""Unhandled clauses:
-            |You are likely trying to use an unsupported carbon feature."""".stripMargin)
-    }
-    clauses
-  }
-
-  object Token {
-    /** @return matches of the form (tokenName, children). */
-    def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
-      t match {
-        case t: ASTNode =>
-          CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
-          Some((t.getText,
-            Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
-        case _ => None
-      }
-    }
-  }
-
-  /**
-   * Extract the table properties token
-   *
-   * @param node
-   * @return
-   */
-  protected def getProperties(node: Node): Seq[(String, String)] = {
-    node match {
-      case Token("TOK_TABLEPROPLIST", list) =>
-        list.map {
-          case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
-            (unquoteString(key) -> unquoteString(value))
-        }
-    }
-  }
-
-  protected def unquoteString(str: String) = {
-    str match {
-      case singleQuotedString(s) => s.toLowerCase()
-      case doubleQuotedString(s) => s.toLowerCase()
-      case other => other
-    }
-  }
-
   protected lazy val loadDataNew: Parser[LogicalPlan] =
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
     (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
@@ -1015,143 +338,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
           isOverwrite.isDefined)
     }
 
-  private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
-
-    // validate with all supported options
-    val options = optionList.get.groupBy(x => x._1)
-    val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
-      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
-      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
-    )
-    var isSupported = true
-    val invalidOptions = StringBuilder.newBuilder
-    options.foreach(value => {
-      if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
-        isSupported = false
-        invalidOptions.append(value._1)
-      }
-
-    }
-    )
-    if (!isSupported) {
-      val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    //  COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
-    if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
-        options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
-      val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
-                         " in options"
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
-      val maxColumns: String = options.get("maxcolumns").get(0)._2
-      try {
-        maxColumns.toInt
-      } catch {
-        case ex: NumberFormatException =>
-          throw new MalformedCarbonCommandException(
-            "option MAXCOLUMNS can only contain integer values")
-      }
-    }
-
-    // check for duplicate options
-    val duplicateOptions = options filter {
-      case (_, optionlist) => optionlist.size > 1
-    }
-    val duplicates = StringBuilder.newBuilder
-    if (duplicateOptions.nonEmpty) {
-      duplicateOptions.foreach(x => {
-        duplicates.append(x._1)
-      }
-      )
-      val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-  }
-
-  protected lazy val dbTableIdentifier: Parser[Seq[String]] =
-    (ident <~ ".").? ~ (ident) ^^ {
-      case databaseName ~ tableName =>
-        if (databaseName.isDefined) {
-          Seq(databaseName.get, tableName)
-        } else {
-          Seq(tableName)
-        }
-    }
-
-  protected lazy val loadOptions: Parser[(String, String)] =
-    (stringLit <~ "=") ~ stringLit ^^ {
-      case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
-      case _ => ("", "")
-    }
-
-
-  protected lazy val dimCol: Parser[Field] = anyFieldDef
-
-  protected lazy val primitiveTypes =
-    STRING ^^^ "string" | INTEGER ^^^ "integer" |
-    TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
-    BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
-    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
-
-  /**
-   * Matching the decimal(10,0) data type and returning the same.
-   */
-  private lazy val charType =
-    CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
-      case char ~ digit =>
-        s"$char($digit)"
-    }
-
-  /**
-   * Matching the decimal(10,0) data type and returning the same.
-   */
-  private lazy val decimalType =
-  DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
-    case decimal ~ precision ~ scale =>
-      s"$decimal($precision, $scale)"
-  }
-
-  protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
-                                                 primitiveFieldType
-
-  protected lazy val anyFieldDef: Parser[Field] =
-    (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
-      case e1 ~ e2 ~ e3 =>
-        Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
-    }
-
-  protected lazy val primitiveFieldType: Parser[Field] =
-    (primitiveTypes) ^^ {
-      case e1 =>
-        Field("unknown", Some(e1), Some("unknown"), Some(null))
-    }
-
-  protected lazy val arrayFieldType: Parser[Field] =
-    ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
-      case e1 =>
-        Field("unknown", Some("array"), Some("unknown"),
-          Some(List(Field("val", e1.dataType, Some("val"),
-            e1.children))))
-    }
-
-  protected lazy val structFieldType: Parser[Field] =
-    ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
-      case e1 =>
-        Field("unknown", Some("struct"), Some("unknown"), Some(e1))
-    }
-
-  protected lazy val measureCol: Parser[Field] =
-    (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
-                           BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
-    (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
-      case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
-    }
-
   protected lazy val describeTable: Parser[LogicalPlan] =
     ((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
       case ef ~ db ~ tbl =>
@@ -1169,109 +355,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         }
     }
 
-  private def normalizeType(field: Field): Field = {
-    val dataType = field.dataType.getOrElse("NIL")
-    dataType match {
-      case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "smallint" => Field(field.column, Some("SmallInt"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale
-      )
-      // checking if the nested data type contains the child type as decimal(10,0),
-      // if it is present then extracting the precision and scale. resetting the data type
-      // with Decimal.
-      case _ if (dataType.startsWith("decimal")) =>
-        val (precision, scale) = getScaleAndPrecision(dataType)
-        Field(field.column,
-          Some("Decimal"),
-          field.name,
-          Some(null),
-          field.parent,
-          field.storeType, field.schemaOrdinal, precision,
-          scale
-        )
-      case _ =>
-        field
-    }
-  }
-
-  private def addParent(field: Field): Field = {
-    field.dataType.getOrElse("NIL") match {
-      case "Array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal)
-      case "Struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal)
-      case _ => field
-    }
-  }
-
-  private def appendParentForEachChild(field: Field, parentName: String): Field = {
-    field.dataType.getOrElse("NIL") match {
-      case "String" => Field(parentName + "." + field.column, Some("String"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Long" => Field(parentName + "." + field.column, Some("Long"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Double" => Field(parentName + "." + field.column, Some("Double"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Array" => Field(parentName + "." + field.column, Some("Array"),
-        Some(parentName + "." + field.name.getOrElse(None)),
-        field.children
-          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
-        parentName)
-      case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
-        Some(parentName + "." + field.name.getOrElse(None)),
-        field.children
-          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
-        parentName)
-      case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale)
-      case _ => field
-    }
-  }
-
   protected lazy val showLoads: Parser[LogicalPlan] =
     SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
     (LIMIT ~> numericLit).? <~
@@ -1280,13 +363,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
     }
 
-  protected lazy val segmentId: Parser[String] =
-    numericLit ^^ { u => u } |
-    elem("decimal", p => {
-      p.getClass.getSimpleName.equals("FloatLit") ||
-      p.getClass.getSimpleName.equals("DecimalLit")
-    }) ^^ (_.chars)
-
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
     DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
                                                    (ident <~ ".").? ~ ident) <~

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d82290e..8ad1203 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -139,7 +139,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
     val dbName = cm.databaseName
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm, sqlContext)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
 
     if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
       sys.error("No Dimensions found. Table should have at least one dimesnion !")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 88e43fd..2943a52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types.{StringType, TimestampType}
 
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
@@ -39,3 +42,34 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
 case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
+object getDB {
+
+  def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
+    dbName.getOrElse(
+      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
+  }
+}
+
+/**
+ * Shows Loads in a table
+ */
+case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
+  extends Command {
+
+  override def output: Seq[Attribute] = {
+    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)(),
+      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+      AttributeReference("Load End Time", TimestampType, nullable = false)())
+  }
+}
+
+/**
+ * Describe formatted for hive table
+ */
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends Command {
+
+  override def output: Seq[AttributeReference] =
+    Seq(AttributeReference("result", StringType, nullable = false)())
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1fa710e..976a1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -41,13 +41,13 @@ object CarbonEnv {
 
   var initialized = false
 
-  def init(sqlContext: SQLContext): Unit = {
+  def init(sparkSession: SparkSession): Unit = {
     if (!initialized) {
       val catalog = {
         val storePath =
           CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+        new CarbonMetastore(sparkSession.conf, storePath)
       }
       carbonEnv = CarbonEnv(catalog)
       initialized = true

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
new file mode 100644
index 0000000..748d292
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.internal.SessionState
+
+/**
+ * Session implementation for {org.apache.spark.sql.SparkSession}
+ * Implemented this class only to use our own SQL DDL commands.
+ * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
+ * @param sc
+ */
+class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+
+  CarbonEnv.init(this)
+
+  @transient
+  override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+}
+
+object CarbonSession {
+
+  implicit class CarbonBuilder(builder: Builder) {
+
+
+    def getOrCreateCarbonSession(): SparkSession = synchronized {
+
+      val options =
+        getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+      val userSuppliedContext: Option[SparkContext] =
+        getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]]
+
+      // Get the session from current thread's active session.
+      var session: SparkSession = SparkSession.getActiveSession match {
+        case Some(session) =>
+          if ((session ne null) && !session.sparkContext.isStopped) {
+            options.foreach { case (k, v) => session.conf.set(k, v) }
+            session
+          } else {
+            null
+          }
+        case _ => null
+      }
+      if (session ne null) {
+        return session
+      }
+
+      // Global synchronization so we will only set the default session once.
+      SparkSession.synchronized {
+        // If the current thread does not have an active session, get it from the global session.
+        session = SparkSession.getDefaultSession match {
+          case Some(session) =>
+            if ((session ne null) && !session.sparkContext.isStopped) {
+              options.foreach { case (k, v) => session.conf.set(k, v) }
+              session
+            } else {
+              null
+            }
+          case _ => null
+        }
+        if (session ne null) {
+          return session
+        }
+
+        // No active nor global default session. Create a new one.
+        val sparkContext = userSuppliedContext.getOrElse {
+          // set app name if not given
+          val randomAppName = java.util.UUID.randomUUID().toString
+          val sparkConf = new SparkConf()
+          options.foreach { case (k, v) => sparkConf.set(k, v) }
+          if (!sparkConf.contains("spark.app.name")) {
+            sparkConf.setAppName(randomAppName)
+          }
+          val sc = SparkContext.getOrCreate(sparkConf)
+          // maybe this is an existing SparkContext, update its SparkConf which maybe used
+          // by SparkSession
+          options.foreach { case (k, v) => sc.conf.set(k, v) }
+          if (!sc.conf.contains("spark.app.name")) {
+            sc.conf.setAppName(randomAppName)
+          }
+          sc
+        }
+        session = new CarbonSession(sparkContext)
+        options.foreach { case (k, v) => session.conf.set(k, v) }
+        SparkSession.setDefaultSession(session)
+
+        // Register a successfully instantiated context to the singleton. This should be at the
+        // end of the class definition so that the singleton is updated only if there is no
+        // exception in the construction of the instance.
+        sparkContext.addSparkListener(new SparkListener {
+          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+            SparkSession.setDefaultSession(null)
+            SparkSession.sqlListener.set(null)
+          }
+        })
+      }
+
+      return session
+    }
+
+    /**
+     * It is a hack to get the private field from class.
+     */
+    def getValue(name: String, builder: Builder): Any = {
+      val currentMirror = scala.reflect.runtime.currentMirror
+      val instanceMirror = currentMirror.reflect(builder)
+      val m = currentMirror.classSymbol(builder.getClass).
+        toType.members.find { p =>
+        p.name.toString.equals(name)
+      }.get.asTerm
+      instanceMirror.reflectField(m).get
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b639ea8..c78ddf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -48,7 +48,7 @@ class CarbonSource extends CreatableRelationProvider
                                mode: SaveMode,
                                parameters: Map[String, String],
                                data: DataFrame): BaseRelation = {
-    CarbonEnv.init(sqlContext)
+    CarbonEnv.init(sqlContext.sparkSession)
     // User should not specify path since only one store is supported in carbon currently,
     // after we support multi-store, we can remove this limitation
     require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -88,7 +88,7 @@ class CarbonSource extends CreatableRelationProvider
                                sqlContext: SQLContext,
                                parameters: Map[String, String],
                                dataSchema: StructType): BaseRelation = {
-    CarbonEnv.init(sqlContext)
+    CarbonEnv.init(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
     val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
     CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -97,8 +97,10 @@ class CarbonSource extends CreatableRelationProvider
   }
 
   private def addLateDecodeOptimization(ss: SparkSession): Unit = {
-    ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
-    ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+    if (ss.sessionState.experimentalMethods.extraStrategies.isEmpty) {
+      ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+      ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+    }
   }
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
@@ -131,7 +133,7 @@ class CarbonSource extends CreatableRelationProvider
         val map = scala.collection.mutable.Map[String, String]();
         parameters.foreach { x => map.put(x._1, x._2) }
         val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
-        CreateTable(cm).run(sparkSession)
+        CreateTable(cm, false).run(sparkSession)
         CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 5508a94..51b79c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -31,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.{AtomicType, IntegerType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
new file mode 100644
index 0000000..5211f1a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(LoadTable(identifier.database, identifier.table, path, Seq(),
+          Map(), isOverwrite)) :: Nil
+      case DropTableCommand(identifier, ifNotExists, isView)
+        if CarbonEnv.get.carbonMetastore
+          .isTablePathExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(
+          CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
+      case ShowLoadsCommand(databaseName, table, limit) =>
+        ExecutedCommandExec(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
+      case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+        CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+        ExecutedCommandExec(createDb) :: Nil
+      case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+        if (isCascade) {
+          val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+            .filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
+          tablesInDB.foreach{tableName =>
+            CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
+          }
+        }
+        CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+        ExecutedCommandExec(drop) :: Nil
+      case alterTable@AlterTableCompaction(altertablemodel) =>
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
+          .tableExists(TableIdentifier(altertablemodel.tableName,
+            altertablemodel.dbName))(sparkSession)
+        if (isCarbonTable) {
+          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+              altertablemodel.compactionType.equalsIgnoreCase("major")) {
+            ExecutedCommandExec(alterTable) :: Nil
+          } else {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on carbon table")
+          }
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+        val resolvedTable =
+          sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+        val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+        ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+      case _ => Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 68ad4d6..8f97961 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,18 +17,27 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.io.File
+import java.text.SimpleDateFormat
+
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
 
+import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
@@ -36,19 +45,34 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
 
+object Checker {
+  def validateTableExists(
+      dbName: Option[String],
+      tableName: String,
+      session: SparkSession): Unit = {
+    val identifier = TableIdentifier(tableName, dbName)
+    if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+      val err = s"table $dbName.$tableName not found"
+      LogServiceFactory.getLogService(this.getClass.getName).error(err)
+      throw new IllegalArgumentException(err)
+    }
+  }
+}
+
 /**
  * Command for the compaction in alter table command
  *
  * @param alterTableModel
  */
-case class AlterTableCompaction(alterTableModel: AlterTableModel) {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -109,24 +133,59 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
   }
 }
 
-case class CreateTable(cm: TableModel) {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    CarbonEnv.init(sparkSession)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
     val dbName = cm.databaseName
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm, sparkSession.sqlContext)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
 
     if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
       sys.error("No Dimensions found. Table should have at least one dimesnion !")
     }
-    // Add Database to catalog and persist
-    val catalog = CarbonEnv.get.carbonMetastore
-    val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
-    LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tbName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+          s"Table [$tbName] already exists under database [$dbName]")
+        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+      }
+    } else {
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.get.carbonMetastore
+      // Need to fill partitioner class when we support partition
+      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      if (createDSTable) {
+        try {
+          sparkSession.sql(
+            s"""CREATE TABLE $dbName.$tbName
+                |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
+                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+        } catch {
+          case e: Exception =>
+            val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+            // call the drop table to delete the created table.
+
+            CarbonEnv.get.carbonMetastore
+              .dropTable(catalog.storePath, identifier)(sparkSession)
+
+            LOGGER.audit(s"Table creation with Database name [$dbName] " +
+                         s"and Table name [$tbName] failed")
+            throw e
+        }
+      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+    }
+
     Seq.empty
   }
 
@@ -136,6 +195,54 @@ case class CreateTable(cm: TableModel) {
   }
 }
 
+case class DeleteLoadsById(
+    loadids: Seq[String],
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadById(
+      loadids,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+
+  }
+
+  // validates load ids
+  private def validateLoadIds: Unit = {
+    if (loadids.isEmpty) {
+      val errorMessage = "Error: Segment id(s) should not be empty."
+      throw new MalformedCarbonCommandException(errorMessage)
+
+    }
+  }
+}
+
+case class DeleteLoadsByLoadDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadByDate(
+      loadDate,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+  }
+
+}
 
 object LoadTable {
 
@@ -205,7 +312,7 @@ case class LoadTable(
     options: scala.collection.immutable.Map[String, String],
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None) {
+    dataFrame: Option[DataFrame] = None) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -433,3 +540,191 @@ case class LoadTable(
     }
   }
 }
+
+private[sql] case class DeleteLoadByDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadByDate(
+      loadDate,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+  }
+
+}
+
+case class CleanFiles(
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    CarbonStore.cleanFiles(
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName,
+      relation.asInstanceOf[CarbonRelation].tableMeta.storePath
+    )
+    Seq.empty
+  }
+}
+
+case class ShowLoads(
+    databaseNameOp: Option[String],
+    tableName: String,
+    limit: Option[String],
+    override val output: Seq[Attribute]) extends RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.showSegments(
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName,
+      limit
+    )
+  }
+}
+
+case class CarbonDropTableCommand(ifExistsSet: Boolean,
+    databaseNameOp: Option[String],
+    tableName: String)
+  extends RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
+    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    var isLocked = false
+    try {
+      isLocked = carbonLock.lockWithRetries()
+      if (isLocked) {
+        logInfo("Successfully able to get the lock for drop.")
+      }
+      else {
+        LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
+        sys.error("Table is locked for deletion. Please try after some time")
+      }
+      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+      CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+      LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    } finally {
+      if (carbonLock != null && isLocked) {
+        if (carbonLock.unlock()) {
+          logInfo("Table MetaData Unlocked Successfully after dropping the table")
+          // deleting any remaining files.
+          val metadataFilePath = CarbonStorePath
+            .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+          val fileType = FileFactory.getFileType(metadataFilePath)
+          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
+          }
+          // delete bad record log after drop table
+          val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
+          val badLogFileType = FileFactory.getFileType(badLogPath)
+          if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
+            val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
+          }
+        } else {
+          logError("Unable to unlock Table MetaData")
+        }
+      }
+    }
+    Seq.empty
+  }
+}
+
+private[sql] case class DescribeCommandFormatted(
+    child: SparkPlan,
+    override val output: Seq[Attribute],
+    tblIdentifier: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val mapper = new ObjectMapper()
+    val colProps = StringBuilder.newBuilder
+    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+      val comment = if (relation.metaData.dims.contains(field.name)) {
+        val dimension = relation.metaData.carbonTable.getDimensionByName(
+          relation.tableMeta.carbonTableIdentifier.getTableName,
+          field.name)
+        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+          colProps.append(field.name).append(".")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
+        }
+        if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+            !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          "DICTIONARY, KEY COLUMN"
+        } else {
+          "KEY COLUMN"
+        }
+      } else {
+        ("MEASURE")
+      }
+      (field.name, field.dataType.simpleString, comment)
+    }
+    val colPropStr = if (colProps.toString().trim().length() > 0) {
+      // drops additional comma at end
+      colProps.toString().dropRight(1)
+    } else {
+      colProps.toString()
+    }
+    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+      .getDatabaseName, "")
+    )
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+    if (colPropStr.length() > 0) {
+      results ++= Seq((colPropStr, "", ""))
+    } else {
+      results ++= Seq(("ADAPTIVE", "", ""))
+    }
+    val dimension = carbonTable
+      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    results ++= getColumnGroups(dimension.asScala.toList)
+    results.map { case (name, dataType, comment) =>
+      Row(f"$name%-36s $dataType%-80s $comment%-72s")
+    }
+  }
+
+  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+    var results: Seq[(String, String, String)] =
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
+    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+      case (groupId, _) => groupId != -1
+    }.toSeq.sortBy(_._1)
+    val groups = groupedDimensions.map(colGroups => {
+      colGroups._2.map(dim => dim.getColName).mkString(", ")
+    })
+    var index = 1
+    groups.foreach { x =>
+      results = results :+ (s"Column Group $index", x, "")
+      index = index + 1
+    }
+    results
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9638b8f..f174126 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -187,6 +187,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     tables.nonEmpty
   }
 
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
   def loadMetadata(metadataPath: String, queryId: String): MetaData = {
     val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
     val statistic = new QueryStatistic()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..066acce
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+
+  experimentalMethods.extraStrategies =
+    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
new file mode 100644
index 0000000..028286c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.ShowLoadsCommand
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command._
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL, DML cases in Unified context
+ */
+class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
+
+  override def parse(input: String): LogicalPlan = {
+    synchronized {
+      // Initialize the Keywords.
+      initLexical
+      phrase(start)(new lexical.Scanner(input)) match {
+        case Success(plan, _) => plan match {
+          case x: LoadTable =>
+            x.inputSqlString = input
+            x
+          case logicalPlan => logicalPlan
+        }
+        case failureOrError => sys.error(failureOrError.toString)
+      }
+    }
+  }
+
+
+  protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
+
+  protected lazy val startCommand: Parser[LogicalPlan] =
+    loadManagement| showLoads | alterTable
+
+  protected lazy val loadManagement: Parser[LogicalPlan] =
+    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+
+
+  protected lazy val alterTable: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";")  ^^ {
+      case dbName ~ table ~ (compact ~ compactType) =>
+        val altertablemodel = AlterTableModel(dbName, table, compactType, null)
+        AlterTableCompaction(altertablemodel)
+    }
+
+
+  protected lazy val loadDataNew: Parser[LogicalPlan] =
+    LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
+    (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+      case filePath ~ isOverwrite ~ table ~ optionsList =>
+        val (databaseNameOp, tableName) = table match {
+          case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+        }
+        if (optionsList.isDefined) {
+          validateOptions(optionsList)
+        }
+        val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+        LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+          isOverwrite.isDefined)
+    }
+
+  protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
+    DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+                                                   (ident <~ ".").? ~ ident) <~
+    opt(";") ^^ {
+      case loadids ~ table => table match {
+        case databaseName ~ tableName =>
+          DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
+      }
+    }
+
+  protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
+    DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+    opt(";") ^^ {
+      case schema ~ table ~ condition =>
+        condition match {
+          case dateField ~ dateValue =>
+            DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
+        }
+    }
+
+  protected lazy val cleanFiles: Parser[LogicalPlan] =
+    CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
+      case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
+    }
+
+  protected lazy val explainPlan: Parser[LogicalPlan] =
+    (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+      case isExtended ~ logicalPlan =>
+        logicalPlan match {
+          case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+          case _ => ExplainCommand(OneRowRelation)
+        }
+    }
+
+  protected lazy val showLoads: Parser[LogicalPlan] =
+    SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (LIMIT ~> numericLit).? <~
+    opt(";") ^^ {
+      case databaseName ~ tableName ~ limit =>
+        ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
+    }
+}


Mime
View raw message