carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/4] incubator-carbondata git commit: Initial commit
Date Tue, 27 Dec 2016 01:19:08 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 28190eb71 -> e8dcd4296


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
new file mode 100644
index 0000000..9a3f828
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.catalog.CatalogColumn
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext,
TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Concrete parser for Spark SQL statements and carbon specific statements
+ */
+class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+
+  val astBuilder = new CarbonSqlAstBuilder(conf)
+
+  private val substitutor = new VariableSubstitution(conf)
+
+  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T =
{
+    super.parse(substitutor.substitute(command))(toResult)
+  }
+
+  override def parsePlan(sqlText: String): LogicalPlan = {
+    try {
+      super.parsePlan(sqlText)
+    } catch {
+      case e: Throwable =>
+        astBuilder.parser.parse(sqlText)
+    }
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+
+  val parser = new CarbonSpark2SqlParser
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    val fileStorage = Option(ctx.createFileFormat) match {
+      case Some(value) => value.storageHandler().STRING().getSymbol.getText
+      case _ => ""
+    }
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+      // TODO: implement temporary tables
+      if (temp) {
+        throw new ParseException(
+          "CREATE TEMPORARY TABLE is not supported yet. " +
+          "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
+      }
+      if (ctx.skewSpec != null) {
+        operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
+      }
+      if (ctx.bucketSpec != null) {
+        operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
+      }
+      val comment = Option(ctx.STRING).map(string)
+      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+      val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+      val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+        .getOrElse(Map.empty)
+
+      // Ensuring whether no duplicate name is used in table definition
+      val colNames = cols.map(_.name)
+      if (colNames.length != colNames.distinct.length) {
+        val duplicateColumns = colNames.groupBy(identity).collect {
+          case (x, ys) if ys.length > 1 => "\"" + x + "\""
+        }
+        operationNotAllowed(s"Duplicated column names found in table definition of $name:
" +
+                            duplicateColumns.mkString("[", ",", "]"), ctx)
+      }
+
+      // For Hive tables, partition columns must not be part of the schema
+      val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns may not be specified in the schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+      }
+
+      // Note: Hive requires partition columns to be distinct from the schema, so we need
+      // to include the partition columns here explicitly
+      val schema = cols ++ partitionCols
+
+      val fields = schema.map { col =>
+        val x = col.name + ' ' + col.dataType
+        val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x))
+        match {
+          case parser.Success(field, _) => field.asInstanceOf[Field]
+          case failureOrError => throw new MalformedCarbonCommandException(
+            s"Unsupported data type: $col.getType")
+        }
+        // the data type of the decimal type will be like decimal(10,0)
+        // so checking the start of the string and taking the precision and scale.
+        // resetting the data type with decimal
+        if (f.dataType.getOrElse("").startsWith("decimal")) {
+          val (precision, scale) = parser.getScaleAndPrecision(col.dataType)
+          f.precision = precision
+          f.scale = scale
+          f.dataType = Some("decimal")
+        }
+        if(f.dataType.getOrElse("").startsWith("char")) {
+          f.dataType = Some("char")
+        }
+        f.rawSchema = x
+        f
+      }
+
+      // validate tblProperties
+      if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
+        throw new MalformedCarbonCommandException("Invalid table properties")
+      }
+      // prepare table model of the collected tokens
+      val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+        name.database,
+        name.table,
+        fields,
+        Seq(),
+        properties.asJava.asScala)
+
+      CreateTable(tableModel)
+    } else {
+      super.visitCreateTable(ctx)
+    }
+  }
+
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+   */
+  private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
= {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.filter { case (_, v) => v == null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
+    }
+    props
+  }
+
+  private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = {
+    withOrigin(ctx) {
+      ctx.colType.asScala.map { col =>
+        CatalogColumn(
+          col.identifier.getText.toLowerCase,
+          // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>"
we can't
+          // just convert the whole type string to lower case, otherwise the struct field
names
+          // will no longer be case sensitive. Instead, we rely on our parser to get the
proper
+          // case before passing it to Hive.
+          typedVisit[DataType](col.dataType).catalogString,
+          nullable = true,
+          Option(col.STRING).map(string))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c84882e..399b3e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     cleanFiles(spark, dbName, tableName, storePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 1e891fd..2db6e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -41,7 +41,7 @@ object Compaction {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index ae95bf6..951cd7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index d5a6861..dad9f59 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 1a02c8c..c953089 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
       None
     }
     val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     val rows = showSegments(spark, dbName, tableName, limit)
     System.out.println(showString(rows))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 8b10aa4..424d8fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -86,7 +86,7 @@ object TableLoader {
 
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 45dcb03..4310d04 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -54,6 +54,11 @@ class QueryTest extends PlanTest {
       clean(metastoredb)
     }
 
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+
+    import org.apache.spark.sql.CarbonSession._
     val spark = SparkSession
         .builder()
         .master("local")
@@ -62,17 +67,13 @@ class QueryTest extends PlanTest {
         .config("spark.sql.warehouse.dir", warehouse)
         .config("javax.jdo.option.ConnectionURL",
           s"jdbc:derby:;databaseName=$metastoredb;create=true")
-        .getOrCreate()
-
-    CarbonProperties.getInstance()
-        .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
-        .addProperty("carbon.storelocation", storeLocation)
+        .getOrCreateCarbonSession()
 
     spark.sparkContext.setLogLevel("WARN")
     spark
   }
 
-  val sc = spark.sparkContext
+  val Dsc = spark.sparkContext
 
   lazy val implicits = spark.implicits
 


Mime
View raw message