spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-16157][SQL] Add New Methods for comments in StructField and StructType
Date Wed, 29 Jun 2016 11:36:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master d1e810885 -> 7ee9e39cb


[SPARK-16157][SQL] Add New Methods for comments in StructField and StructType

#### What changes were proposed in this pull request?
Based on the previous discussion with cloud-fan hvanhovell in another related PR https://github.com/apache/spark/pull/13764#discussion_r67994276,
it looks reasonable to add convenience methods for users to add `comment` when defining `StructField`.

Currently, the column-related `comment` attribute is stored in `Metadata` of `StructField`.
For example, users can add the `comment` attribute using the following way:
```Scala
StructType(
  StructField(
    "cl1",
    IntegerType,
    nullable = false,
    new MetadataBuilder().putString("comment", "test").build()) :: Nil)
```
This PR is to add more user friendly methods for the `comment` attribute when defining a `StructField`.
After the changes, users are provided three different ways to do it:
```Scala
val struct = (new StructType)
  .add("a", "int", true, "test1")

val struct = (new StructType)
  .add("c", StringType, true, "test3")

val struct = (new StructType)
  .add(StructField("d", StringType).withComment("test4"))
```

#### How was this patch tested?
Added test cases:
- `DataTypeSuite` is for testing three types of API changes,
- `DataFrameReaderWriterSuite` is for parquet, json and csv formats - using in-memory catalog
- `OrcQuerySuite.scala` is for orc format using Hive-metastore

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13860 from gatorsmile/newMethodForComment.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ee9e39c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ee9e39c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ee9e39c

Branch: refs/heads/master
Commit: 7ee9e39cb43c43d69dfe8035106f7556886e60b1
Parents: d1e8108
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Wed Jun 29 19:36:21 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Jun 29 19:36:21 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 10 ++----
 .../apache/spark/sql/types/StructField.scala    | 18 ++++++++++
 .../org/apache/spark/sql/types/StructType.scala | 35 ++++++++++++++++++++
 .../apache/spark/sql/types/DataTypeSuite.scala  | 17 ++++++++++
 .../spark/sql/execution/command/tables.scala    |  3 +-
 .../sql/execution/command/DDLCommandSuite.scala |  3 +-
 .../apache/spark/sql/sources/DDLTestSuite.scala |  3 +-
 .../spark/sql/sources/TableScanSuite.scala      |  8 ++---
 .../sql/test/DataFrameReaderWriterSuite.scala   | 27 ++++++++++++++-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      | 22 ++++++++++++
 10 files changed, 125 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c7420a1..f2cc8d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1430,13 +1430,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
    */
   override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
     import ctx._
-
-    // Add the comment to the metadata.
-    val builder = new MetadataBuilder
-    if (STRING != null) {
-      builder.putString("comment", string(STRING))
-    }
-
-    StructField(identifier.getText, typedVisit(dataType), nullable = true, builder.build())
+    val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
+    if (STRING == null) structField else structField.withComment(string(STRING))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index 83570a5..cb8bf61 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -51,4 +51,22 @@ case class StructField(
       ("nullable" -> nullable) ~
       ("metadata" -> metadata.jsonValue)
   }
+
+  /**
+   * Updates the StructField with a new comment value.
+   */
+  def withComment(comment: String): StructField = {
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putString("comment", comment)
+      .build()
+    copy(metadata = newMetadata)
+  }
+
+  /**
+   * Return the comment of this StructField.
+   */
+  def getComment(): Option[String] = {
+    if (metadata.contains("comment")) Option(metadata.getString("comment")) else None
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 436512f..0e89f71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -171,6 +171,23 @@ case class StructType(fields: Array[StructField]) extends DataType with
Seq[Stru
   }
 
   /**
+   * Creates a new [[StructType]] by adding a new field and specifying metadata.
+   * {{{
+   * val struct = (new StructType)
+   *   .add("a", IntegerType, true, "comment1")
+   *   .add("b", LongType, false, "comment2")
+   *   .add("c", StringType, true, "comment3")
+   * }}}
+   */
+  def add(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      comment: String): StructType = {
+    StructType(fields :+ StructField(name, dataType, nullable).withComment(comment))
+  }
+
+  /**
    * Creates a new [[StructType]] by adding a new nullable field with no metadata where the
    * dataType is specified as a String.
    *
@@ -219,6 +236,24 @@ case class StructType(fields: Array[StructField]) extends DataType with
Seq[Stru
   }
 
   /**
+   * Creates a new [[StructType]] by adding a new field and specifying metadata where the
+   * dataType is specified as a String.
+   * {{{
+   * val struct = (new StructType)
+   *   .add("a", "int", true, "comment1")
+   *   .add("b", "long", false, "comment2")
+   *   .add("c", "string", true, "comment3")
+   * }}}
+   */
+  def add(
+      name: String,
+      dataType: String,
+      nullable: Boolean,
+      comment: String): StructType = {
+    add(name, CatalystSqlParser.parseDataType(dataType), nullable, comment)
+  }
+
+  /**
    * Extracts the [[StructField]] with the given name.
    *
    * @throws IllegalArgumentException if a field with the given name does not exist

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 6b85f12..688bc3e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -52,6 +52,23 @@ class DataTypeSuite extends SparkFunSuite {
     assert(StructField("b", LongType, false) === struct("b"))
   }
 
+  test("construct with add from StructField with comments") {
+    // Test creation from StructField using four different ways
+    val struct = (new StructType)
+      .add("a", "int", true, "test1")
+      .add("b", StringType, true, "test3")
+      .add(StructField("c", LongType, false).withComment("test4"))
+      .add(StructField("d", LongType))
+
+    assert(StructField("a", IntegerType, true).withComment("test1") == struct("a"))
+    assert(StructField("b", StringType, true).withComment("test3") == struct("b"))
+    assert(StructField("c", LongType, false).withComment("test4") == struct("c"))
+    assert(StructField("d", LongType) == struct("d"))
+
+    assert(struct("c").getComment() == Option("test4"))
+    assert(struct("d").getComment().isEmpty)
+  }
+
   test("construct with String DataType") {
     // Test creation with DataType as String
     val struct = (new StructType)

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 30dc7e8..687d69a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -528,8 +528,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean,
isF
 
   private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
     schema.foreach { column =>
-      val comment =
-        if (column.metadata.contains("comment")) column.metadata.getString("comment") else
""
+      val comment = column.getComment().getOrElse("")
       append(buffer, column.name, column.dataType.simpleString, comment)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 7b96f4c..e1a7b9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -354,8 +354,7 @@ class DDLCommandSuite extends PlanTest {
     val expected = CreateTableUsing(
       TableIdentifier("my_tab"),
       Some(new StructType()
-        .add("a", IntegerType, nullable = true,
-          new MetadataBuilder().putString("comment", s"test").build())
+        .add("a", IntegerType, nullable = true, "test")
         .add("b", StringType)),
       "parquet",
       false,

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 5a7a907..d0ad319 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -45,8 +45,7 @@ case class SimpleDDLScan(
 
   override def schema: StructType =
     StructType(Seq(
-      StructField("intType", IntegerType, nullable = false,
-        new MetadataBuilder().putString("comment", s"test comment $table").build()),
+      StructField("intType", IntegerType, nullable = false).withComment(s"test comment $table"),
       StructField("stringType", StringType, nullable = false),
       StructField("dateType", DateType, nullable = false),
       StructField("timestampType", TimestampType, nullable = false),

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 93116d8..d486fa8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -388,12 +388,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
        |)
        """.stripMargin)
 
-       val planned = sql("SELECT * FROM student").queryExecution.executedPlan
-       val comments = planned.schema.fields.map { field =>
-         if (field.metadata.contains("comment")) field.metadata.getString("comment")
-         else "NO_COMMENT"
-       }.mkString(",")
-
+    val planned = sql("SELECT * FROM student").queryExecution.executedPlan
+    val comments = planned.schema.fields.map(_.getComment().getOrElse("NO_COMMENT")).mkString(",")
     assert(comments === "SN,SA,NO_COMMENT")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index ebbcc1d..58b1d56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 
@@ -391,6 +391,31 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext
with Be
     spark.range(10).write.orc(dir)
   }
 
+  test("column nullability and comment - write and then read") {
+    import testImplicits._
+
+    Seq("json", "parquet", "csv").foreach { format =>
+      val schema = StructType(
+        StructField("cl1", IntegerType, nullable = false).withComment("test") ::
+          StructField("cl2", IntegerType, nullable = true) ::
+          StructField("cl3", IntegerType, nullable = true) :: Nil)
+      val row = Row(3, null, 4)
+      val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
+
+      val tableName = "tab"
+      withTable(tableName) {
+        df.write.format(format).mode("overwrite").saveAsTable(tableName)
+        // Verify the DDL command result: DESCRIBE TABLE
+        checkAnswer(
+          sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
+          Row("cl1", "test") :: Nil)
+        // Verify the schema
+        val expectedFields = schema.fields.map(f => f.copy(nullable = true))
+        assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
+      }
+    }
+  }
+
   private def testRead(
       df: => DataFrame,
       expectedResult: Seq[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/7ee9e39c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index cd41da7..4a86987 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 case class AllDataTypesWithNonPrimitiveType(
     stringField: String,
@@ -462,4 +463,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
       }
     }
   }
+
+  test("column nullability and comment - write and then read") {
+    val schema = (new StructType)
+      .add("cl1", IntegerType, nullable = false, comment = "test")
+      .add("cl2", IntegerType, nullable = true)
+      .add("cl3", IntegerType, nullable = true)
+    val row = Row(3, null, 4)
+    val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)
+
+    val tableName = "tab"
+    withTable(tableName) {
+      df.write.format("orc").mode("overwrite").saveAsTable(tableName)
+      // Verify the DDL command result: DESCRIBE TABLE
+      checkAnswer(
+        sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
+        Row("cl1", "test") :: Nil)
+      // Verify the schema
+      val expectedFields = schema.fields.map(f => f.copy(nullable = true))
+      assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message