carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [01/14] incubator-carbondata git commit: parser hash partition table
Date Mon, 08 May 2017 09:48:13 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/12-dev 558366ae8 -> b72a90e06


parser hash partition table


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c95b311c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c95b311c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c95b311c

Branch: refs/heads/12-dev
Commit: c95b311ca2077ef68d4eceddacac633637a9bc1b
Parents: 558366a
Author: lionelcao <whucaolu@gmail.com>
Authored: Tue Apr 25 11:50:10 2017 +0800
Committer: lionelcao <whucaolu@gmail.com>
Committed: Thu May 4 13:49:06 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 69 ++++++++++----------
 .../execution/command/carbonTableSchema.scala   |  5 +-
 .../org/apache/spark/sql/TableCreator.scala     |  5 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 19 +++---
 5 files changed, 52 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c95b311c/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4a79e0c..841733a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -806,7 +806,7 @@ public final class CarbonCommonConstants {
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
   public static final String SORT_COLUMNS = "sort_columns";
-  public static final String PARTITIONCLASS = "partitionclass";
+  public static final String PARTITIONING = "partitioning";
   public static final String PARTITIONCOUNT = "partitioncount";
   public static final String COLUMN_PROPERTIES = "columnproperties";
   // table block size in MB

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c95b311c/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 23d1b58..611481e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
 import scala.language.implicitConversions
 import scala.util.matching.Regex
 
@@ -33,10 +33,13 @@ import org.apache.spark.sql.execution.command._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.Partitioning
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 /**
  * TODO remove the duplicate code and add the common methods to common class.
@@ -258,7 +261,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
 
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
-
+    // get partitionInfo
+    val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
@@ -275,7 +279,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields: Option[BucketFields])
+      bucketFields: Option[BucketFields],
+      partitionInfo)
   }
 
   /**
@@ -347,43 +352,39 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
   }
 
   /**
-   * For getting the partitioner Object
-   *
+   * get partition info
    * @param partitionCols
    * @param tableProperties
-   * @return
    */
-  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]):
-  Option[Partitioner] = {
-
-    // by default setting partition class empty.
-    // later in table schema it is setting to default value.
-    var partitionClass: String = ""
-    var partitionCount: Int = 1
-    var partitionColNames: Array[String] = Array[String]()
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
-      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
-    }
+  protected def getPartitionInfo(partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]): Option[PartitionInfo] = {
+    var partitioning: String = ""
+    var partition_count = 0
 
+    if (tableProperties.get(CarbonCommonConstants.PARTITIONING).isDefined) {
+      partitioning = tableProperties.get(CarbonCommonConstants.PARTITIONING).get
+    }
     if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
-      try {
-        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
-      } catch {
-        case e: Exception => // no need to do anything.
-      }
+      partition_count = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
     }
-
-    partitionCols.foreach(col =>
-      partitionColNames :+= col.partitionColumn
-    )
-
-    // this means user has given partition cols list
-    if (!partitionColNames.isEmpty) {
-      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+    val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
+    partitionCols.foreach(partition_col => {
+      val columnSchema = new ColumnSchema
+      columnSchema.setDataType(DataTypeConverterUtil.
+        convertToCarbonType(partition_col.dataType.get))
+      columnSchema.setColumnName(partition_col.partitionColumn)
+      cols += columnSchema
+    })
+
+    val partitionInfo: Option[PartitionInfo] = partitioning.toUpperCase() match {
+      case "HASH" => Some(new PartitionInfo(cols.asJava,
+                              Partitioning.HASH, partition_count))
+      case "LIST" => None
+      case "RANGE" => None
+      case "RANGE_INTERVAL" => None
+      case _ => None
     }
-    // if partition cols are not given then no need to do partition.
-    None
+    partitionInfo
   }
 
   protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String,
String]):

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c95b311c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 187512d..6f50ecc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.CarbonCommonFactory
@@ -60,7 +60,8 @@ case class TableModel(
     columnGroups: Seq[String],
     colProps: Option[util.Map[String,
     util.List[ColumnProperty]]] = None,
-    bucketFields: Option[BucketFields])
+    bucketFields: Option[BucketFields],
+    partitionInfo: Option[PartitionInfo])
 
 case class Field(column: String, var dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c95b311c/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 9e14f16..3493905 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -484,6 +484,8 @@ object TableCreator {
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
 
+    val partitionInfo = None
+
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
@@ -500,7 +502,8 @@ object TableCreator {
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields: Option[BucketFields])
+      bucketFields: Option[BucketFields],
+      partitionInfo)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c95b311c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 02c1366..b22422b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
 TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
+PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
 import org.apache.carbondata.spark.CarbonOption
@@ -97,6 +98,8 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
         operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
       }
       val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+      val partitionFields = partitionCols.map(x =>
+                              PartitionerField(x.name, Some(x.dataType.toString), null))
       val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
       val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
         .getOrElse(Map.empty)
@@ -111,18 +114,14 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
                             duplicateColumns.mkString("[", ",", "]"), ctx)
       }
 
-      // For Hive tables, partition columns must not be part of the schema
+      // partition columns must be part of the schema
       val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns may not be specified in the schema: " +
+      if (badPartCols.isEmpty) {
+        operationNotAllowed(s"Partition columns must be specified in the schema: " +
                             badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
       }
 
-      // Note: Hive requires partition columns to be distinct from the schema, so we need
-      // to include the partition columns here explicitly
-      val schema = cols ++ partitionCols
-
-      val fields = schema.map { col =>
+      val fields = cols.map { col =>
         val x = if (col.dataType.catalogString == "float") {
           '`' + col.name + '`' + " double"
         }
@@ -179,7 +178,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
         convertDbNameToLowerCase(name.database),
         name.table.toLowerCase,
         fields,
-        Seq(),
+        partitionFields,
         tableProperties,
         bucketFields)
 


Mime
View raw message