spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-22431][SQL] Ensure that the datatype in the schema for the table/view metadata is parseable by Spark before persisting it
Date Tue, 28 Nov 2017 21:01:08 GMT
Repository: spark
Updated Branches:
  refs/heads/master da3557429 -> a10b328db


[SPARK-22431][SQL] Ensure that the datatype in the schema for the table/view metadata is parseable
by Spark before persisting it

## What changes were proposed in this pull request?
* JIRA:  [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)  : Creating Permanent
view with illegal type

**Description:**
- It is possible in Spark SQL to create a permanent view that uses an nested field with an
illegal name.
- For example if we create the following view:
```create view x as select struct('a' as `$q`, 1 as b) q```
- A simple select fails with the following exception:

```
select * from x;

org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
  at org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
  at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
...
```
**Issue/Analysis**: Right now, we can create a view with a schema that cannot be read back
by Spark from the Hive metastore.  For more details, please see the discussion about the analysis
and proposed fix options in comment 1 and comment 2 in the [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)

**Proposed changes**:
 - Fix the hive table/view codepath to check whether the schema datatype is parseable by Spark
before persisting it in the metastore. This change is localized to HiveClientImpl to do the
check similar to the check in FromHiveColumn. This is fail-fast and we will avoid the scenario
where we write something to the metastore that we are unable to read it back.
- Added new unit tests
- Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK

With the fix:
```
create view x as select struct('a' as `$q`, 1 as b) q;
17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select struct('a' as `$q`,
1 as b) q]
org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
	at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
...
```
## How was this patch tested?
- New unit tests have been added.

hvanhovell, Please review and share your thoughts/comments.  Thank you so much.

Author: Sunitha Kambhampati <skambha@us.ibm.com>

Closes #19747 from skambha/spark22431.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a10b328d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a10b328d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a10b328d

Branch: refs/heads/master
Commit: a10b328dbc056aafaa696579f9a6e2b0cb8eb25f
Parents: da35574
Author: Sunitha Kambhampati <skambha@us.ibm.com>
Authored: Tue Nov 28 22:01:01 2017 +0100
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Tue Nov 28 22:01:01 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/command/DDLSuite.scala  | 15 ++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 17 +++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 82 ++++++++++++++++++++
 3 files changed, 111 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 878f435..fdb9b2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -117,6 +117,21 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext
with Befo
     }
   }
 
+  test("SPARK-22431: table with nested type col with special char") {
+    withTable("t") {
+      spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
+      checkAnswer(spark.table("t"), Nil)
+    }
+  }
+
+  test("SPARK-22431: view with nested type") {
+    withView("t", "v") {
+      spark.sql("CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+      checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
+    }
+  }
 }
 
 abstract class DDLSuite extends QueryTest with SQLTestUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b5a5890..47ce6ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -488,6 +488,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState
{
+    verifyColumnDataType(table.dataSchema)
     client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
   }
 
@@ -507,6 +508,7 @@ private[hive] class HiveClientImpl(
     // these properties are still available to the others that share the same Hive metastore.
     // If users explicitly alter these Hive-specific properties through ALTER TABLE DDL,
we respect
     // these user-specified values.
+    verifyColumnDataType(table.dataSchema)
     val hiveTable = toHiveTable(
       table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
     // Do not use `table.qualifiedName` here because this may be a rename
@@ -520,6 +522,7 @@ private[hive] class HiveClientImpl(
       newDataSchema: StructType,
       schemaProps: Map[String, String]): Unit = withHiveState {
     val oldTable = client.getTable(dbName, tableName)
+    verifyColumnDataType(newDataSchema)
     val hiveCols = newDataSchema.map(toHiveColumn)
     oldTable.setFields(hiveCols.asJava)
 
@@ -872,15 +875,19 @@ private[hive] object HiveClientImpl {
     new FieldSchema(c.name, typeString, c.getComment().orNull)
   }
 
-  /** Builds the native StructField from Hive's FieldSchema. */
-  def fromHiveColumn(hc: FieldSchema): StructField = {
-    val columnType = try {
+  /** Get the Spark SQL native DataType from Hive's FieldSchema. */
+  private def getSparkSQLDataType(hc: FieldSchema): DataType = {
+    try {
       CatalystSqlParser.parseDataType(hc.getType)
     } catch {
       case e: ParseException =>
         throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
     }
+  }
 
+  /** Builds the native StructField from Hive's FieldSchema. */
+  def fromHiveColumn(hc: FieldSchema): StructField = {
+    val columnType = getSparkSQLDataType(hc)
     val metadata = if (hc.getType != columnType.catalogString) {
       new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
     } else {
@@ -895,6 +902,10 @@ private[hive] object HiveClientImpl {
     Option(hc.getComment).map(field.withComment).getOrElse(field)
   }
 
+  private def verifyColumnDataType(schema: StructType): Unit = {
+    schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
+  }
+
   private def toInputFormat(name: String) =
     Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_,
_]]]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a10b328d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d3465a6..9063ef0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with
BeforeA
   test("alter datasource table add columns - partitioned - orc") {
     testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+    val queries = Seq(
+      "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+      "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+      "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+    queries.foreach(query => {
+      val err = intercept[SparkException] {
+        spark.sql(query)
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string"))
+    })
+
+    withView("v") {
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+      val err = intercept[SparkException] {
+        spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string"))
+    }
+  }
+
+  test("SPARK-22431: table with nested type") {
+    withTable("t", "x") {
+      spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
+      checkAnswer(spark.table("t"), Nil)
+      spark.sql("CREATE TABLE x (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      checkAnswer(spark.table("x"), Nil)
+    }
+  }
+
+  test("SPARK-22431: view with nested type") {
+    withView("v") {
+      spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+      checkAnswer(spark.table("v"), Row(Row("a", 1)) :: Nil)
+
+      spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
+      val df = spark.table("v")
+      assert("q1".equals(df.schema.fields(0).name))
+      checkAnswer(df, Row(Row("a", 1)) :: Nil)
+    }
+  }
+
+  test("SPARK-22431: alter table tests with nested types") {
+    withTable("t1", "t2", "t3") {
+      spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, col2:Int>)")
+      val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
+      assert("newcol1".equals(newcol))
+
+      spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) USING PARQUET")
+      spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
+      spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")
+
+      val df2 = spark.table("t2")
+      checkAnswer(df2, Nil)
+      assert("newcol1".equals(df2.schema.fields(2).name))
+      assert("newcol2".equals(df2.schema.fields(3).name))
+
+      spark.sql("CREATE TABLE t3(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING PARQUET")
+      spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
+      spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, col2:Int>)")
+
+      val df3 = spark.table("t3")
+      checkAnswer(df3, Nil)
+      assert("newcol1".equals(df3.schema.fields(2).name))
+      assert("newcol2".equals(df3.schema.fields(3).name))
+    }
+  }
+
+  test("SPARK-22431: negative alter table tests with nested types") {
+    withTable("t1") {
+      spark.sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT)")
+      val err = intercept[SparkException] {
+        spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
+      }.getMessage
+      assert(err.contains("Cannot recognize hive type string:"))
+   }
+  }
 }
 
 class HiveDDLSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message