carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [06/14] incubator-carbondata git commit: add range/list parser
Date Mon, 08 May 2017 09:48:18 GMT
add range/list parser


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3e7c2305
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3e7c2305
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3e7c2305

Branch: refs/heads/12-dev
Commit: 3e7c23052443ffa01ea6f7877547561ed0d6045e
Parents: 6677497
Author: lionelcao <whucaolu@gmail.com>
Authored: Fri May 5 01:47:29 2017 +0800
Committer: lionelcao <whucaolu@gmail.com>
Committed: Fri May 5 01:47:29 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/PartitionInfo.java     |  4 --
 .../examples/CarbonPartitionExample.scala       | 31 ++++++++++---
 .../carbondata/spark/util/CommonUtil.scala      |  6 ++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 46 ++++++++++++--------
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  2 +-
 5 files changed, 59 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e7c2305/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 86ef3c5..6a7af9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -33,9 +33,6 @@ public class PartitionInfo implements Serializable {
    */
   private List<ColumnSchema> columnSchemaList;
 
-  /**
-   * partition type
-   */
   private PartitionType partitionType;
 
   /**
@@ -54,7 +51,6 @@ public class PartitionInfo implements Serializable {
   private int hashNumber;
 
   /**
-   * For range partition table
    * @param columnSchemaList
    * @param partitionType
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e7c2305/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 03223f3..0b63c9c 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -48,6 +48,7 @@ object CarbonPartitionExample {
 
     spark.sparkContext.setLogLevel("WARN")
 
+    // range partition
     spark.sql("DROP TABLE IF EXISTS t1")
 
     spark.sql("""
@@ -61,23 +62,43 @@ object CarbonPartitionExample {
                 | )
                 | PARTITIONED BY (logdate Timestamp)
                 | STORED BY 'carbondata'
-                | TBLPROPERTIES('PARTITION_TYPE'='RANGE','RANGE_INFO'='20140101, 20150101
,20160101 ')
+                | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+                | 'RANGE_INFO'='20140101, 2015/01/01 ,2016-01-01, ')
               """.stripMargin)
 
+    // hash partition
     spark.sql("DROP TABLE IF EXISTS t3")
 
     spark.sql("""
-       | CREATE TABLE IF NOT EXISTS t3
+                | CREATE TABLE IF NOT EXISTS t3
+                | (
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | area String
+                | )
+                | PARTITIONED BY (vin String)
+                | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='HASH','HASH_NUMBER'='5')
+                """.stripMargin)
+
+    // list partition
+    spark.sql("DROP TABLE IF EXISTS t5")
+
+    spark.sql("""
+       | CREATE TABLE IF NOT EXISTS t5
        | (
        | vin String,
        | logdate Timestamp,
        | phonenumber Long,
        | country String,
        | area String
-       | )
-       | PARTITIONED BY (vin String)
+       |)
+       | PARTITIONED BY (country string)
        | STORED BY 'carbondata'
-       | TBLPROPERTIES('PARTITION_TYPE'='HASH','PARTITIONCOUNT'='5')
+       | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+       | 'LIST_INFO'='(china,usa),uk,japan,(canada,russia), korea ')
        """.stripMargin)
 
     // spark.sql(s"""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e7c2305/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index c4701de..301159e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -160,6 +160,8 @@ object CommonUtil {
     var isValid: Boolean = true
     val partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE)
     val hashNumber = tableProperties.get(CarbonCommonConstants.HASH_NUMBER)
+    val rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO)
+    val listInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO)
 
     // partition column and partitioning should be both exist or not exist
     if (partitionCols.isEmpty ^ partitionType.isEmpty) {
@@ -167,8 +169,8 @@ object CommonUtil {
     } else if (partitionCols.nonEmpty) {
       partitionType.get.toUpperCase() match {
         case "HASH" => if (!hashNumber.isDefined) isValid = false
-        case "LIST" => isValid = false
-        case "RANGE" => isValid = false
+        case "LIST" => if (!listInfo.isDefined) isValid = false
+        case "RANGE" => if (!rangeInfo.isDefined) isValid = false
         case "RANGE_INTERVAL" => isValid = false
         case _ => isValid = false
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e7c2305/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7c87b61..30660a8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map}
 import scala.language.implicitConversions
 import scala.util.matching.Regex
 
@@ -360,9 +360,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       tableProperties: Map[String, String]): Option[PartitionInfo] = {
     var partitionType: String = ""
     var hashNumber = 0
-    var rangeInfo: List[String] = List[String]()
-    var listInfo: List[List[String]] = List[List[String]]()
-
+    var rangeInfo = List[String]()
+    var listInfo = ListBuffer[List[String]]()
+    var templist = ListBuffer[String]()
     if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) {
       partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get
     }
@@ -370,12 +370,27 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       hashNumber = tableProperties.get(CarbonCommonConstants.HASH_NUMBER).get.toInt
     }
     if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) {
-      rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.replace(" ","")
+      rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.replace(" ",
"")
         .split(",").toList
     }
     if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) {
-      rangeInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO).get.replace(" ","")
-        .split(",").toList
+      val arr = tableProperties.get(CarbonCommonConstants.LIST_INFO).get
+                      .replace(" ", "").split(",")
+      val iter = arr.iterator
+      while(iter.hasNext) {
+        val value = iter.next()
+        if (value.startsWith("(")) {
+          templist += value.replace("(", "")
+        } else if (value.endsWith(")")) {
+          templist += value.replace(")", "")
+          listInfo += templist.toList
+          templist.clear()
+        } else {
+          templist += value
+          listInfo += templist.toList
+          templist.clear()
+        }
+      }
     }
     val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
     partitionCols.foreach(partition_col => {
@@ -388,17 +403,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
 
     var partitionInfo : PartitionInfo = null
     partitionType.toUpperCase() match {
-      case "HASH" => {
-        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH)
-        partitionInfo.setHashNumber(hashNumber)
-      }
-      case "RANGE" => {
-        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE)
-        partitionInfo.setRangeInfo(rangeInfo.asJava)
-      }
-      case "LIST" => {
-        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
-      }
+      case "HASH" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH)
+                     partitionInfo.setHashNumber(hashNumber)
+      case "RANGE" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE)
+                      partitionInfo.setRangeInfo(rangeInfo.asJava)
+      case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
+                     partitionInfo.setListInfo(listInfo.map(_.asJava).toList.asJava)
     }
     Some(partitionInfo)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e7c2305/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 9aacc7f..b2bd067 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -119,7 +119,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)
{
 
       // validate partition clause
       if (!CommonUtil.validatePartitionColumns(tableProperties, partitionCols)) {
-        operationNotAllowed("Invalid Partition definition", ctx)
+        throw new MalformedCarbonCommandException("Invalid partition definition")
       } else if (partitionCols.nonEmpty) {
         // partition columns must be part of the schema
         val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)


Mime
View raw message