spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-18594][SQL] Name Validation of Databases/Tables
Date Mon, 28 Nov 2016 03:43:35 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9c03c5646 -> 07f32c228


[SPARK-18594][SQL] Name Validation of Databases/Tables

### What changes were proposed in this pull request?
Currently, the name validation checks are limited to table creation. It is enfored by Analyzer
rule: `PreWriteCheck`.

However, table renaming and database creation have the same issues. It makes more sense to
do the checks in `SessionCatalog`. This PR is to add it into `SessionCatalog`.

### How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16018 from gatorsmile/nameValidate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f32c22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f32c22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f32c22

Branch: refs/heads/master
Commit: 07f32c2283e26e86474ba8c9b50125831009a1ea
Parents: 9c03c56
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Sun Nov 27 19:43:24 2016 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Sun Nov 27 19:43:24 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 18 +++++++++++++
 .../catalyst/catalog/SessionCatalogSuite.scala  | 27 +++++++++++++++++++
 .../spark/sql/execution/datasources/rules.scala | 28 +++++---------------
 .../spark/sql/hive/MultiDatabaseSuite.scala     | 11 ++++----
 4 files changed, 57 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07f32c22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 19a8fcd..002aecb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -86,6 +86,21 @@ class SessionCatalog(
   protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
 
   /**
+   * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
+   * i.e. if this name only contains characters, numbers, and _.
+   *
+   * This method is intended to have the same behavior of
+   * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
+   */
+  private def validateName(name: String): Unit = {
+    val validNameFormat = "([\\w_]+)".r
+    if (!validNameFormat.pattern.matcher(name).matches()) {
+      throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " +
+        "Valid names only contain alphabet characters, numbers and _.")
+    }
+  }
+
+  /**
    * Format table name, taking into account case sensitivity.
    */
   protected[this] def formatTableName(name: String): String = {
@@ -143,6 +158,7 @@ class SessionCatalog(
         s"${globalTempViewManager.database} is a system preserved database, " +
           "you cannot create a database with this name.")
     }
+    validateName(dbName)
     val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     externalCatalog.createDatabase(
       dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
@@ -226,6 +242,7 @@ class SessionCatalog(
   def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
     val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableDefinition.identifier.table)
+    validateName(table)
     val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
     requireDbExists(db)
     externalCatalog.createTable(newTableDefinition, ignoreIfExists)
@@ -474,6 +491,7 @@ class SessionCatalog(
       if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
         requireTableExists(TableIdentifier(oldTableName, Some(db)))
         requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+        validateName(newTableName)
         externalCatalog.renameTable(db, oldTableName, newTableName)
       } else {
         if (newName.database.isDefined) {

http://git-wip-us.apache.org/repos/asf/spark/blob/07f32c22/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 52385de..da41d36 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -61,6 +61,22 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.databaseExists("does_not_exist"))
   }
 
+  def testInvalidName(func: (String) => Unit) {
+    // scalastyle:off
+    // non ascii characters are not allowed in the source code, so we disable the scalastyle.
+    val name = "砖"
+    // scalastyle:on
+    val e = intercept[AnalysisException] {
+      func(name)
+    }.getMessage
+    assert(e.contains(s"`$name` is not a valid name for tables/databases."))
+  }
+
+  test("create databases using invalid names") {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true))
+  }
+
   test("get database when a database exists") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val db1 = catalog.getDatabaseMetadata("db1")
@@ -194,6 +210,11 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
   }
 
+  test("create tables using invalid names") {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists
= false))
+  }
+
   test("create table when database does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     // Creating table in non-existent database should always fail
@@ -309,6 +330,12 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
+  test("rename tables to an invalid name") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    testInvalidName(
+      name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name)))
+  }
+
   test("rename table when database/table does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[NoSuchDatabaseException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/07f32c22/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5ba44ff..7154e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -309,24 +309,9 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
 
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
-  // This regex is used to check if the table name and database name is valid for `CreateTable`.
-  private val validNameFormat = Pattern.compile("[\\w_]+")
-
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
-        // Since we are saving table metadata to metastore, we should make sure the table
name
-        // and database name don't break some common restrictions, e.g. special chars except
-        // underscore are not allowed.
-        val tblIdent = tableDesc.identifier
-        if (!validNameFormat.matcher(tblIdent.table).matches()) {
-          failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " +
-            s"metastore. Metastore only accepts table name containing characters, numbers
and _.")
-        }
-        if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) {
-          failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for "
+
-            s"metastore. Metastore only accepts table name containing characters, numbers
and _.")
-        }
         if (query.isDefined &&
           mode == SaveMode.Overwrite &&
           catalog.tableExists(tableDesc.identifier)) {
@@ -334,7 +319,7 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation, _, _) =>
+            case LogicalRelation(dest: BaseRelation, _, _) =>
               // Get all input data source relations of the query.
               val srcRelations = query.get.collect {
                 case LogicalRelation(src: BaseRelation, _, _) => src
@@ -347,9 +332,8 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           }
         }
 
-      case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation, _, _),
-        partition, query, overwrite, ifNotExists) =>
+      case logical.InsertIntoTable(
+          l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
         // Right now, we do not support insert into a data source table with partition specs.
         if (partition.nonEmpty) {
           failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
@@ -367,15 +351,15 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
         }
 
       case logical.InsertIntoTable(
-        LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) =>
+        LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) =>
         // We need to make sure the partition columns specified by users do match partition
         // columns of the relation.
         val existingPartitionColumns = r.partitionSchema.fieldNames.toSet
         val specifiedPartitionColumns = part.keySet
         if (existingPartitionColumns != specifiedPartitionColumns) {
-          failAnalysis(s"Specified partition columns " +
+          failAnalysis("Specified partition columns " +
             s"(${specifiedPartitionColumns.mkString(", ")}) " +
-            s"do not match the partition columns of the table. Please use " +
+            "do not match the partition columns of the table. Please use " +
             s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
         } else {
           // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/07f32c22/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 9f4401a..7322465 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -269,17 +269,17 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
       val message = intercept[AnalysisException] {
         df.write.format("parquet").saveAsTable("`d:b`.`t:a`")
       }.getMessage
-      assert(message.contains("is not a valid name for metastore"))
+      assert(message.contains("Database 'd:b' not found"))
     }
 
     {
       val message = intercept[AnalysisException] {
         df.write.format("parquet").saveAsTable("`d:b`.`table`")
       }.getMessage
-      assert(message.contains("is not a valid name for metastore"))
+      assert(message.contains("Database 'd:b' not found"))
     }
 
-    withTempPath { dir =>
+    withTempDir { dir =>
       val path = dir.getCanonicalPath
 
       {
@@ -293,7 +293,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
             |)
             """.stripMargin)
         }.getMessage
-        assert(message.contains("is not a valid name for metastore"))
+        assert(message.contains("`t:a` is not a valid name for tables/databases. " +
+          "Valid names only contain alphabet characters, numbers and _."))
       }
 
       {
@@ -307,7 +308,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
               |)
               """.stripMargin)
         }.getMessage
-        assert(message.contains("is not a valid name for metastore"))
+        assert(message.contains("Database 'd:b' not found"))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message