spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhovell <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4131] [SQL] Support INSERT OVERWRITE [L...
Date Mon, 23 May 2016 05:59:32 GMT
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13067#discussion_r64172078
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
---
    @@ -199,20 +200,123 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging
{
       }
     
       /**
    -   * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan.
    +   * A table property key can either be String or a collection of dot separated elements.
This
    +   * function extracts the property key based on whether its a string literal or a table
property
    +   * identifier.
    +   */
    +  override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
    +    if (key.STRING != null) {
    +      string(key.STRING)
    +    } else {
    +      key.getText
    +    }
    +  }
    +
    +  /**
    +   * Convert a table property list into a key-value map.
    +   */
    +  override def visitTablePropertyList(
    +      ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
    +    ctx.tableProperty.asScala.map { property =>
    +      val key = visitTablePropertyKey(property.key)
    +      val value = Option(property.value).map(string).orNull
    +      key -> value
    +    }.toMap
    +  }
    +
    +  /** Empty storage format for default values and copies. */
    +  protected val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false,
Map.empty)
    +
    +  /**
    +   * Create a [[CatalogStorageFormat]] used for creating tables.
    +   *
    +   * Example format:
    +   * {{{
    +   *   SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
    +   * }}}
    +   *
    +   * OR
    +   *
    +   * {{{
    +   *   DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
    +   *   [COLLECTION ITEMS TERMINATED BY char]
    +   *   [MAP KEYS TERMINATED BY char]
    +   *   [LINES TERMINATED BY char]
    +   *   [NULL DEFINED AS char]
    +   * }}}
    +   */
    +  protected def visitRowFormat(
    +      ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
    +    ctx match {
    +      case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
    +      case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
    +    }
    +  }
    +
    +  /**
    +   * Create SERDE row format name and properties pair.
    +   */
    +  override def visitRowFormatSerde(
    +      ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
    +    import ctx._
    +    EmptyStorageFormat.copy(
    +      serde = Option(string(name)),
    +      serdeProperties = Option(tablePropertyList).map(visitTablePropertyList)
    +        .getOrElse(Map.empty))
    +  }
    +
    +  /**
    +   * Create a delimited row format properties object.
    +   */
    +  override def visitRowFormatDelimited(
    +      ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
    +    // Collect the entries if any.
    +    def entry(key: String, value: Token): Seq[(String, String)] = {
    +      Option(value).toSeq.map(x => key -> string(x))
    +    }
    +    // TODO we need proper support for the NULL format.
    +    val entries =
    +      entry("field.delim", ctx.fieldsTerminatedBy) ++
    +        entry("serialization.format", ctx.fieldsTerminatedBy) ++
    +        entry("escape.delim", ctx.escapedBy) ++
    +        // The following typo is inherited from Hive...
    +        entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
    +        entry("mapkey.delim", ctx.keysTerminatedBy) ++
    +        Option(ctx.linesSeparatedBy).toSeq.map { token =>
    +          val value = string(token)
    +          assert(
    +            value == "\n",
    +            s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
    +            ctx)
    +          "line.delim" -> value
    +        }
    +    EmptyStorageFormat.copy(serdeProperties = entries.toMap)
    +  }
    +
    +  /**
    +   * Add an INSERT INTO [TABLE] / INSERT OVERWRITE TABLE / INSERT OVERWRITE DIRECTORY
    +   * operation to the logical plan.
        */
       private def withInsertInto(
           ctx: InsertIntoContext,
           query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
    -    val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
    +    val tableIdent = Option(ctx.tableIdentifier)
    +      .map(ti => Option(visitTableIdentifier(ti))).getOrElse(None)
         val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
     
    -    InsertIntoTable(
    -      UnresolvedRelation(tableIdent, None),
    +    tableIdent.map(ti => InsertIntoTable(
    --- End diff --
    
    Please use a Pattern match or an if-else here; this is harder to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message