spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-14213][SQL] Migrate HiveQl parsing to ANTLR4 parser
Date Tue, 29 Mar 2016 03:19:23 GMT
Repository: spark
Updated Branches:
  refs/heads/master 38326cad8 -> 27d4ef0c6


[SPARK-14213][SQL] Migrate HiveQl parsing to ANTLR4 parser

### What changes were proposed in this pull request?

This PR migrates all HiveQl parsing to the new ANTLR4 parser. This PR is build on top of https://github.com/apache/spark/pull/12011,
and we should wait with merging until that one is in (hence the WIP tag).

As soon as this PR is merged we can start removing much of the old parser infrastructure.

### How was this patch tested?

Exisiting Hive unit tests.

cc rxin andrewor14 yhuai

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #12015 from hvanhovell/SPARK-14213.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d4ef0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d4ef0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d4ef0c

Branch: refs/heads/master
Commit: 27d4ef0c619501be592366ae8f0be77294c9687d
Parents: 38326ca
Author: Herman van Hovell <hvanhovell@questtec.nl>
Authored: Mon Mar 28 20:19:21 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon Mar 28 20:19:21 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/ng/SqlBase.g4     |  34 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  13 +-
 .../spark/sql/hive/HiveSessionState.scala       |   3 +-
 .../sql/hive/execution/HiveSqlParser.scala      | 442 +++++++++++++++++++
 4 files changed, 488 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27d4ef0c/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4
index e46fd9b..4e77b6d 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/ng/SqlBase.g4
@@ -118,7 +118,9 @@ statement
     | UNCACHE TABLE identifier                                         #uncacheTable
     | CLEAR CACHE                                                      #clearCache
     | ADD identifier .*?                                               #addResource
+    | SET ROLE .*?                                                     #failNativeCommand
     | SET .*?                                                          #setConfiguration
+    | kws=unsupportedHiveNativeCommands .*?                            #failNativeCommand
     | hiveNativeCommands                                               #executeNativeCommand
     ;
 
@@ -145,7 +147,26 @@ hiveNativeCommands
     | ROLLBACK WORK?
     | SHOW PARTITIONS tableIdentifier partitionSpec?
     | DFS .*?
-    | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | REVOKE | GRANT | LOCK | UNLOCK |
MSCK | EXPORT | IMPORT | LOAD) .*?
+    | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOCK | UNLOCK | MSCK | LOAD) .*?
+    ;
+
+unsupportedHiveNativeCommands
+    : kw1=CREATE kw2=ROLE
+    | kw1=DROP kw2=ROLE
+    | kw1=GRANT kw2=ROLE?
+    | kw1=REVOKE kw2=ROLE?
+    | kw1=SHOW kw2=GRANT
+    | kw1=SHOW kw2=ROLE kw3=GRANT?
+    | kw1=SHOW kw2=PRINCIPALS
+    | kw1=SHOW kw2=ROLES
+    | kw1=SHOW kw2=CURRENT kw3=ROLES
+    | kw1=EXPORT kw2=TABLE
+    | kw1=IMPORT kw2=TABLE
+    | kw1=SHOW kw2=COMPACTIONS
+    | kw1=SHOW kw2=CREATE kw3=TABLE
+    | kw1=SHOW kw2=TRANSACTIONS
+    | kw1=SHOW kw2=INDEXES
+    | kw1=SHOW kw2=LOCKS
     ;
 
 createTableHeader
@@ -619,7 +640,8 @@ nonReserved
     | AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
     | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION
| COMPUTE
     | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
-    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT
+    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT |
ROLE
+    | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEXES | LOCKS | OPTION
     ;
 
 SELECT: 'SELECT';
@@ -834,6 +856,14 @@ MSCK: 'MSCK';
 EXPORT: 'EXPORT';
 IMPORT: 'IMPORT';
 LOAD: 'LOAD';
+ROLE: 'ROLE';
+ROLES: 'ROLES';
+COMPACTIONS: 'COMPACTIONS';
+PRINCIPALS: 'PRINCIPALS';
+TRANSACTIONS: 'TRANSACTIONS';
+INDEXES: 'INDEXES';
+LOCKS: 'LOCKS';
+OPTION: 'OPTION';
 
 STRING
     : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''

http://git-wip-us.apache.org/repos/asf/spark/blob/27d4ef0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index eedd12d..9a5ec98 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -85,7 +85,18 @@ private[hive] object HiveSerDe {
         HiveSerDe(
           inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
           outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")))
+          serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
+
+      "textfile" ->
+        HiveSerDe(
+          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+          outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")),
+
+      "avro" ->
+        HiveSerDe(
+          inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+          outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
+          serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
 
     val key = source.toLowerCase match {
       case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"

http://git-wip-us.apache.org/repos/asf/spark/blob/27d4ef0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index c9b6b1d..11ef0fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.execution.{python, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.execution.HiveSqlParser
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 
 
@@ -70,7 +71,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
   /**
    * Parser for HiveQl query texts.
    */
-  override lazy val sqlParser: ParserInterface = new HiveQl(conf)
+  override lazy val sqlParser: ParserInterface = HiveSqlParser
 
   /**
    * Planner that takes into account Hive-specific strategies.

http://git-wip-us.apache.org/repos/asf/spark/blob/27d4ef0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
new file mode 100644
index 0000000..d6a08fc
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry
+import org.apache.hadoop.hive.ql.parse.EximUtil
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable,
CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.parser.ng._
+import org.apache.spark.sql.catalyst.parser.ng.SqlBaseParser._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect =>
CreateView}
+import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe}
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
+
+/**
+ * Concrete parser for HiveQl statements.
+ */
+object HiveSqlParser extends AbstractSqlParser {
+  val astBuilder = new HiveSqlAstBuilder
+
+  override protected def nativeCommand(sqlText: String): LogicalPlan = {
+    HiveNativeCommand(sqlText)
+  }
+}
+
+/**
+ * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
+ */
+class HiveSqlAstBuilder extends SparkSqlAstBuilder {
+  import ParserUtils._
+
+  /**
+   * Get the current Hive Configuration.
+   */
+  private[this] def hiveConf: HiveConf = {
+    var ss = SessionState.get()
+    // SessionState is lazy initialization, it can be null here
+    if (ss == null) {
+      val original = Thread.currentThread().getContextClassLoader
+      val conf = new HiveConf(classOf[SessionState])
+      conf.setClassLoader(original)
+      ss = new SessionState(conf)
+      SessionState.start(ss)
+    }
+    ss.getConf
+  }
+
+  /**
+   * Pass a command to Hive using a [[HiveNativeCommand]].
+   */
+  override def visitExecuteNativeCommand(
+      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+    HiveNativeCommand(command(ctx))
+  }
+
+  /**
+   * Fail an unsupported Hive native command.
+   */
+  override def visitFailNativeCommand(
+      ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+    val keywords = if (ctx.kws != null) {
+      Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString("
")
+    } else {
+      // SET ROLE is the exception to the rule, because we handle this before other SET commands.
+      "SET ROLE"
+    }
+    throw new ParseException(s"Unsupported operation: $keywords", ctx)
+  }
+
+  /**
+   * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource.
+   */
+  override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
+    ctx.identifier.getText.toLowerCase match {
+      case "file" => AddFile(remainder(ctx.identifier).trim)
+      case "jar" => AddJar(remainder(ctx.identifier).trim)
+      case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx)
+    }
+  }
+
+  /**
+   * Create a [[DropTable]] command.
+   */
+  override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
+    if (ctx.PURGE != null) {
+      logWarning("PURGE option is ignored.")
+    }
+    if (ctx.REPLICATION != null) {
+      logWarning("REPLICATION clause is ignored.")
+    }
+    DropTable(visitTableIdentifier(ctx.tableIdentifier).toString, ctx.EXISTS != null)
+  }
+
+  /**
+   * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option
(other
+   * options are passed on to Hive) e.g.:
+   * {{{
+   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+   * }}}
+   */
+  override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
+    if (ctx.partitionSpec == null &&
+      ctx.identifier != null &&
+      ctx.identifier.getText.toLowerCase == "noscan") {
+      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+    } else {
+      HiveNativeCommand(command(ctx))
+    }
+  }
+
+  /**
+   * Create a [[CreateTableAsSelect]] command.
+   */
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    if (ctx.query == null) {
+      HiveNativeCommand(command(ctx))
+    } else {
+      // Get the table header.
+      val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+      val tableType = if (external) {
+        CatalogTableType.EXTERNAL_TABLE
+      } else {
+        CatalogTableType.MANAGED_TABLE
+      }
+
+      // Unsupported clauses.
+      if (temp) {
+        logWarning("TEMPORARY clause is ignored.")
+      }
+      if (ctx.bucketSpec != null) {
+        // TODO add this - we need cluster columns in the CatalogTable for this to work.
+        logWarning("CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause is ignored.")
+      }
+      if (ctx.skewSpec != null) {
+        logWarning("SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause is ignored.")
+      }
+
+      // Create the schema.
+      val schema = Option(ctx.colTypeList).toSeq.flatMap(_.colType.asScala).map { col =>
+        CatalogColumn(
+          col.identifier.getText,
+          col.dataType.getText.toLowerCase, // TODO validate this?
+          nullable = true,
+          Option(col.STRING).map(string))
+      }
+
+      // Get the column by which the table is partitioned.
+      val partitionCols = Option(ctx.identifierList).toSeq.flatMap(visitIdentifierList).map
{
+        CatalogColumn(_, null, nullable = true, None)
+      }
+
+      // Create the storage.
+      def format(fmt: ParserRuleContext): CatalogStorageFormat = {
+        Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat)
+      }
+      // Default storage.
+      val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
+      val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
+        HiveSerDe(
+          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+          outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+      }
+      // Defined storage.
+      val fileStorage = format(ctx.createFileFormat)
+      val rowStorage = format(ctx.rowFormat)
+      val storage = CatalogStorageFormat(
+        Option(ctx.locationSpec).map(visitLocationSpec),
+        fileStorage.inputFormat.orElse(hiveSerDe.inputFormat),
+        fileStorage.outputFormat.orElse(hiveSerDe.outputFormat),
+        rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde),
+        rowStorage.serdeProperties ++ fileStorage.serdeProperties
+      )
+
+      val tableDesc = CatalogTable(
+        identifier = table,
+        tableType = tableType,
+        schema = schema,
+        partitionColumns = partitionCols,
+        storage = storage,
+        properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
+        // TODO support the sql text - have a proper location for this!
+        viewText = Option(ctx.STRING).map(string))
+      CTAS(tableDesc, plan(ctx.query), ifNotExists)
+    }
+  }
+
+  /**
+   * Create or replace a view. This creates a [[CreateViewAsSelect]] command.
+   */
+  override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) {
+    // Pass a partitioned view on to hive.
+    if (ctx.identifierList != null) {
+      HiveNativeCommand(command(ctx))
+    } else {
+      if (ctx.STRING != null) {
+        logWarning("COMMENT clause is ignored.")
+      }
+      val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
+      val schema = identifiers.map { ic =>
+        CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string))
+      }
+      createView(
+        ctx,
+        ctx.tableIdentifier,
+        schema,
+        ctx.query,
+        Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
+        ctx.EXISTS != null,
+        ctx.REPLACE != null
+      )
+    }
+  }
+
+  /**
+   * Alter the query of a view. This creates a [[CreateViewAsSelect]] command.
+   */
+  override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx)
{
+    createView(
+      ctx,
+      ctx.tableIdentifier,
+      Seq.empty,
+      ctx.query,
+      Map.empty,
+      allowExist = false,
+      replace = true)
+  }
+
+  /**
+   * Create a [[CreateViewAsSelect]] command.
+   */
+  private def createView(
+      ctx: ParserRuleContext,
+      name: TableIdentifierContext,
+      schema: Seq[CatalogColumn],
+      query: QueryContext,
+      properties: Map[String, String],
+      allowExist: Boolean,
+      replace: Boolean): LogicalPlan = {
+    val sql = Option(source(query))
+    val tableDesc = CatalogTable(
+      identifier = visitTableIdentifier(name),
+      tableType = CatalogTableType.VIRTUAL_VIEW,
+      schema = schema,
+      storage = EmptyStorageFormat,
+      properties = properties,
+      viewOriginalText = sql,
+      viewText = sql)
+    CreateView(tableDesc, plan(query), allowExist, replace, command(ctx))
+  }
+
+  /**
+   * Create a [[Generator]]. Override this method in order to support custom Generators.
+   */
+  override protected def withGenerator(
+      name: String,
+      expressions: Seq[Expression],
+      ctx: LateralViewContext): Generator = {
+    val info = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse {
+      throw new ParseException(s"Couldn't find Generator function '$name'", ctx)
+    }
+    HiveGenericUDTF(name, new HiveFunctionWrapper(info.getFunctionClass.getName), expressions)
+  }
+
+  /**
+   * Create a [[HiveScriptIOSchema]].
+   */
+  override protected def withScriptIOSchema(
+      inRowFormat: RowFormatContext,
+      recordWriter: Token,
+      outRowFormat: RowFormatContext,
+      recordReader: Token,
+      schemaLess: Boolean): HiveScriptIOSchema = {
+    if (recordWriter != null || recordReader != null) {
+      logWarning("Used defined record reader/writer classes are currently ignored.")
+    }
+
+    // Decode and input/output format.
+    type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+    def format(fmt: RowFormatContext, confVar: ConfVars): Format = fmt match {
+      case c: RowFormatDelimitedContext =>
+        // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+        // expects a seq of pairs in which the old parsers' token names are used as keys.
+        // Transforming the result of visitRowFormatDelimited would be quite a bit messier
than
+        // retrieving the key value pairs ourselves.
+        def entry(key: String, value: Token): Seq[(String, String)] = {
+          Option(value).map(t => key -> t.getText).toSeq
+        }
+        val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+          entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+        (entries, None, Seq.empty, None)
+
+      case c: RowFormatSerdeContext =>
+        // Use a serde format.
+        val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+        // SPARK-10310: Special cases LazySimpleSerDe
+        val recordHandler = if (name == classOf[LazySimpleSerDe].getCanonicalName) {
+          Option(hiveConf.getVar(confVar))
+        } else {
+          None
+        }
+        (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+      case null =>
+        // Use default (serde) format.
+        val name = hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)
+        val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
+        val recordHandler = Option(hiveConf.getVar(confVar))
+        (Nil, Option(name), props, recordHandler)
+    }
+
+    val (inFormat, inSerdeClass, inSerdeProps, reader) =
+      format(inRowFormat, ConfVars.HIVESCRIPTRECORDREADER)
+
+    val (outFormat, outSerdeClass, outSerdeProps, writer) =
+      format(inRowFormat, ConfVars.HIVESCRIPTRECORDWRITER)
+
+    HiveScriptIOSchema(
+      inFormat, outFormat,
+      inSerdeClass, outSerdeClass,
+      inSerdeProps, outSerdeProps,
+      reader, writer,
+      schemaLess)
+  }
+
+  /**
+   * Create location string.
+   */
+  override def visitLocationSpec(ctx: LocationSpecContext): String = {
+    EximUtil.relativeToAbsolutePath(hiveConf, super.visitLocationSpec(ctx))
+  }
+
+  /** Empty storage format for default values and copies. */
+  private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
+
+  /**
+   * Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently
+   * ignored.
+   */
+  override def visitTableFileFormat(
+      ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+    import ctx._
+    if (inDriver != null || outDriver != null) {
+      logWarning("INPUTDRIVER ... OUTPUTDRIVER ... clauses are ignored.")
+    }
+    EmptyStorageFormat.copy(
+      inputFormat = Option(string(inFmt)),
+      outputFormat = Option(string(outFmt)),
+      serde = Option(serdeCls).map(string)
+    )
+  }
+
+  /**
+   * Resolve a [[HiveSerDe]] based on the format name given.
+   */
+  override def visitGenericFileFormat(
+      ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+    val source = ctx.identifier.getText
+    HiveSerDe.sourceToSerDe(source, hiveConf) match {
+      case Some(s) =>
+        EmptyStorageFormat.copy(
+          inputFormat = s.inputFormat,
+          outputFormat = s.outputFormat,
+          serde = s.serde)
+      case None =>
+        throw new ParseException(s"Unrecognized file format in STORED AS clause: $source",
ctx)
+    }
+  }
+
+  /**
+   * Storage Handlers are currently not supported in the statements we support (CTAS).
+   */
+  override def visitStorageHandler(ctx: StorageHandlerContext): AnyRef = withOrigin(ctx)
{
+    throw new ParseException("Storage Handlers are currently unsupported.", ctx)
+  }
+
+  /**
+   * Create SERDE row format name and properties pair.
+   */
+  override def visitRowFormatSerde(
+      ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
+    import ctx._
+    EmptyStorageFormat.copy(
+      serde = Option(string(name)),
+      serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
+  }
+
+  /**
+   * Create a delimited row format properties object.
+   */
+  override def visitRowFormatDelimited(
+      ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
+    // Collect the entries if any.
+    def entry(key: String, value: Token): Seq[(String, String)] = {
+      Option(value).toSeq.map(x => key -> string(x))
+    }
+    // TODO we need proper support for the NULL format.
+    val entries = entry(serdeConstants.FIELD_DELIM, ctx.fieldsTerminatedBy) ++
+      entry(serdeConstants.SERIALIZATION_FORMAT, ctx.fieldsTerminatedBy) ++
+      entry(serdeConstants.ESCAPE_CHAR, ctx.escapedBy) ++
+      entry(serdeConstants.COLLECTION_DELIM, ctx.collectionItemsTerminatedBy) ++
+      entry(serdeConstants.MAPKEY_DELIM, ctx.keysTerminatedBy) ++
+      Option(ctx.linesSeparatedBy).toSeq.map { token =>
+        val value = string(token)
+        assert(
+          value == "\n",
+          s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
+          ctx)
+        serdeConstants.LINE_DELIM -> value
+      }
+    EmptyStorageFormat.copy(serdeProperties = entries.toMap)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message