spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-15583][SQL] Disallow altering datasource properties
Date Fri, 27 May 2016 03:11:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6ab973ec5 -> 3fca635b4


[SPARK-15583][SQL] Disallow altering datasource properties

## What changes were proposed in this pull request?

Certain table properties (and SerDe properties) are in the protected namespace `spark.sql.sources.`,
which we use internally for datasource tables. The user should not be allowed to

(1) Create a Hive table setting these properties
(2) Alter these properties in an existing table

Previously, we threw an exception if the user tried to alter the properties of an existing
datasource table. However, this is overly restrictive for datasource tables and does not do
anything for Hive tables.

## How was this patch tested?

DDLSuite

Author: Andrew Or <andrew@databricks.com>

Closes #13341 from andrewor14/alter-table-props.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fca635b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fca635b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fca635b

Branch: refs/heads/master
Commit: 3fca635b4ed322208debcd89a539e42cdde6bbd4
Parents: 6ab973e
Author: Andrew Or <andrew@databricks.com>
Authored: Thu May 26 20:11:09 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu May 26 20:11:09 2016 -0700

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |  17 +++
 .../spark/sql/execution/command/ddl.scala       |  37 +++--
 .../spark/sql/execution/command/tables.scala    |   2 +
 .../spark/sql/execution/command/DDLSuite.scala  | 148 ++++++++++++-------
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   2 +-
 5 files changed, 139 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 6ca66a2..deedb68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -255,6 +255,23 @@ case class CreateDataSourceTableAsSelectCommand(
 
 
 object CreateDataSourceTableUtils extends Logging {
+
+  // TODO: Actually replace usages with these variables (SPARK-15584)
+
+  val DATASOURCE_PREFIX = "spark.sql.sources."
+  val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
+  val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
+  val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
+  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema."
+  val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
+  val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+  val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
+  val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
+  val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
+  val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
+  val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
+  val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+
   /**
    * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
    * i.e. if this name only contains characters, numbers, and _.

http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7ce7bb9..15eba3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
@@ -228,15 +229,13 @@ case class AlterTableSetPropertiesCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val ident = if (isView) "VIEW" else "TABLE"
     val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+    DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident")
     val table = catalog.getTableMetadata(tableName)
-    val newProperties = table.properties ++ properties
-    if (DDLUtils.isDatasourceTable(newProperties)) {
-      throw new AnalysisException("ALTER TABLE SET TBLPROPERTIES is not supported for " +
-        "tables defined using the datasource API")
-    }
-    val newTable = table.copy(properties = newProperties)
+    // This overrides old properties
+    val newTable = table.copy(properties = table.properties ++ properties)
     catalog.alterTable(newTable)
     Seq.empty[Row]
   }
@@ -260,18 +259,16 @@ case class AlterTableUnsetPropertiesCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    val ident = if (isView) "VIEW" else "TABLE"
     val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+    DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident")
     val table = catalog.getTableMetadata(tableName)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "alter table properties is not supported for datasource tables")
-    }
     if (!ifExists) {
       propKeys.foreach { k =>
         if (!table.properties.contains(k)) {
           throw new AnalysisException(
-            s"attempted to unset non-existent property '$k' in table '$tableName'")
+            s"Attempted to unset non-existent property '$k' in table '$tableName'")
         }
       }
     }
@@ -304,11 +301,15 @@ case class AlterTableSerDePropertiesCommand(
     "ALTER TABLE attempted to set neither serde class name nor serde properties")
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    DDLUtils.verifyTableProperties(
+      serdeProperties.toSeq.flatMap(_.keys.toSeq),
+      "ALTER TABLE SERDEPROPERTIES")
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     // Do not support setting serde for datasource tables
     if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException("ALTER TABLE SET SERDE is not supported for datasource
tables")
+      throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
+        "not supported for tables created with the datasource API")
     }
     val newTable = table.withNewStorage(
       serde = serdeClassName.orElse(table.storage.serde),
@@ -489,6 +490,18 @@ object DDLUtils {
     })
   }
 
+  /**
+   * If the given table properties (or SerDe properties) contains datasource properties,
+   * throw an exception.
+   */
+  def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = {
+    val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX))
+    if (datasourceKeys.nonEmpty) {
+      throw new AnalysisException(s"Operation not allowed: $operation property keys may not
" +
+        s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}")
+    }
+  }
+
   def isTablePartitioned(table: CatalogTable): Boolean = {
     table.partitionColumns.nonEmpty ||
       table.properties.contains("spark.sql.sources.schema.numPartCols")

http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e34beec..d102409 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -118,6 +118,8 @@ case class CreateTableLikeCommand(
 case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand
{
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE")
+    DDLUtils.verifyTableProperties(table.storage.serdeProperties.keys.toSeq, "CREATE TABLE")
     sparkSession.sessionState.catalog.createTable(table, ifNotExists)
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 6c038c7..ff56749 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -489,63 +490,19 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   }
 
   test("alter table: set properties") {
-    val catalog = spark.sessionState.catalog
-    val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
-    // set table properties
-    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
-    assert(catalog.getTableMetadata(tableIdent).properties ==
-      Map("andrew" -> "or14", "kor" -> "bel"))
-    // set table properties without explicitly specifying database
-    catalog.setCurrentDatabase("dbx")
-    sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
-    assert(catalog.getTableMetadata(tableIdent).properties ==
-      Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
-    // table to alter does not exist
-    intercept[AnalysisException] {
-      sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
-    }
-    // throw exception for datasource tables
-    convertToDatasourceTable(catalog, tableIdent)
-    val e = intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')")
-    }
-    assert(e.getMessage.contains("datasource"))
+    testSetProperties(isDatasourceTable = false)
+  }
+
+  test("alter table: set properties (datasource table)") {
+    testSetProperties(isDatasourceTable = true)
   }
 
   test("alter table: unset properties") {
-    val catalog = spark.sessionState.catalog
-    val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    // unset table properties
-    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
-    sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
-    assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" ->
"lan"))
-    // unset table properties without explicitly specifying database
-    catalog.setCurrentDatabase("dbx")
-    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
-    assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan"))
-    // table to alter does not exist
-    intercept[AnalysisException] {
-      sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
-    }
-    // property to unset does not exist
-    val e = intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
-    }
-    assert(e.getMessage.contains("xyz"))
-    // property to unset does not exist, but "IF EXISTS" is specified
-    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
-    assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
-    // throw exception for datasource tables
-    convertToDatasourceTable(catalog, tableIdent)
-    val e1 = intercept[AnalysisException] {
-      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')")
-    }
-    assert(e1.getMessage.contains("datasource"))
+    testUnsetProperties(isDatasourceTable = false)
+  }
+
+  test("alter table: unset properties (datasource table)") {
+    testUnsetProperties(isDatasourceTable = true)
   }
 
   test("alter table: set serde") {
@@ -768,6 +725,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
       properties = Map("spark.sql.sources.provider" -> "csv")))
   }
 
+  private def testSetProperties(isDatasourceTable: Boolean): Unit = {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    def getProps: Map[String, String] = {
+      catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
+        !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
+      }
+    }
+    assert(getProps.isEmpty)
+    // set table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
+    assert(getProps == Map("andrew" -> "or14", "kor" -> "bel"))
+    // set table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
+    assert(getProps == Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
+    }
+    // datasource table property keys are not allowed
+    val e = intercept[AnalysisException] {
+      sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
+    }
+    assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+  }
+
+  private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    def getProps: Map[String, String] = {
+      catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
+        !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
+      }
+    }
+    // unset table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x'
= 'y')")
+    sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
+    assert(getProps == Map("p" -> "an", "c" -> "lan", "x" -> "y"))
+    // unset table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
+    assert(getProps == Map("c" -> "lan", "x" -> "y"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
+    }
+    // property to unset does not exist
+    val e = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
+    }
+    assert(e.getMessage.contains("xyz"))
+    // property to unset does not exist, but "IF EXISTS" is specified
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
+    assert(getProps == Map("x" -> "y"))
+    // datasource table property keys are not allowed
+    val e2 = intercept[AnalysisException] {
+      sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
+    }
+    assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+  }
+
   private def testSetLocation(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -870,6 +899,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
     }
+    // serde properties must not be a datasource property
+    val e = intercept[AnalysisException] {
+      sql(s"ALTER TABLE tab1 SET SERDEPROPERTIES ('${DATASOURCE_PREFIX}foo'='wah')")
+    }
+    assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
   }
 
   private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1091,6 +1125,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     }
   }
 
+  test("create table with datasource properties (not allowed)") {
+    assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
+    assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +
+      "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')")
+  }
+
   test("drop default database") {
     Seq("true", "false").foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e2cef38..80e6f4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -287,7 +287,7 @@ class HiveDDLSuite
           sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
         }.getMessage
         assert(message.contains(
-          "attempted to unset non-existent property 'p' in table '`view1`'"))
+          "Attempted to unset non-existent property 'p' in table '`view1`'"))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message