spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-14410][SQL] Push functions existence check into catalog
Date Thu, 07 Apr 2016 23:23:23 GMT
Repository: spark
Updated Branches:
  refs/heads/master aa852215f -> ae1db91d1


[SPARK-14410][SQL] Push functions existence check into catalog

## What changes were proposed in this pull request?

This is a followup to #12117 and addresses some of the TODOs introduced there. In particular,
the resolution of database is now pushed into session catalog, which knows about the current
database. Further, the logic for checking whether a function exists is pushed into the external
catalog.

No change in functionality is expected.

## How was this patch tested?

`SessionCatalogSuite`, `DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #12198 from andrewor14/function-exists.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae1db91d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae1db91d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae1db91d

Branch: refs/heads/master
Commit: ae1db91d158d1ae62a0ab7ea74467679ca050101
Parents: aa85221
Author: Andrew Or <andrew@databricks.com>
Authored: Thu Apr 7 16:23:17 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Apr 7 16:23:17 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 10 ++--
 .../sql/catalyst/catalog/SessionCatalog.scala   | 59 ++++++++++----------
 .../spark/sql/catalyst/catalog/interface.scala  |  2 +
 .../catalyst/catalog/SessionCatalogSuite.scala  | 53 ++++++++++--------
 .../spark/sql/execution/command/commands.scala  |  2 +-
 .../spark/sql/execution/command/ddl.scala       | 13 +++--
 .../spark/sql/execution/command/functions.scala | 31 ++++------
 .../spark/sql/execution/command/DDLSuite.scala  | 41 +++++++-------
 .../spark/sql/hive/HiveExternalCatalog.scala    |  4 ++
 .../spark/sql/hive/client/HiveClient.scala      |  5 ++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 10 +++-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  8 ++-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  2 +-
 13 files changed, 126 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 5d136b6..186bbcc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
   // Database name -> description
   private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
 
-  private def functionExists(db: String, funcName: String): Boolean = {
-    requireDbExists(db)
-    catalog(db).functions.contains(funcName)
-  }
-
   private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean
= {
     requireTableExists(db, table)
     catalog(db).tables(table).partitions.contains(spec)
@@ -315,6 +310,11 @@ class InMemoryCatalog extends ExternalCatalog {
     catalog(db).functions(funcName)
   }
 
+  override def functionExists(db: String, funcName: String): Boolean = {
+    requireDbExists(db)
+    catalog(db).functions.contains(funcName)
+  }
+
   override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
     requireDbExists(db)
     StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2acf584..7db9fd0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -95,7 +95,7 @@ class SessionCatalog(
     externalCatalog.alterDatabase(dbDefinition)
   }
 
-  def getDatabase(db: String): CatalogDatabase = {
+  def getDatabaseMetadata(db: String): CatalogDatabase = {
     externalCatalog.getDatabase(db)
   }
 
@@ -169,7 +169,7 @@ class SessionCatalog(
    * If no database is specified, assume the table is in the current database.
    * If the specified table is not found in the database then an [[AnalysisException]] is
thrown.
    */
-  def getTable(name: TableIdentifier): CatalogTable = {
+  def getTableMetadata(name: TableIdentifier): CatalogTable = {
     val db = name.database.getOrElse(currentDb)
     val table = formatTableName(name.table)
     externalCatalog.getTable(db, table)
@@ -435,28 +435,37 @@ class SessionCatalog(
    * Create a metastore function in the database specified in `funcDefinition`.
    * If no such database is specified, create it in the current database.
    */
-  def createFunction(funcDefinition: CatalogFunction): Unit = {
+  def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
     val db = funcDefinition.identifier.database.getOrElse(currentDb)
-    val newFuncDefinition = funcDefinition.copy(
-      identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
-    externalCatalog.createFunction(db, newFuncDefinition)
+    val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
+    val newFuncDefinition = funcDefinition.copy(identifier = identifier)
+    if (!functionExists(identifier)) {
+      externalCatalog.createFunction(db, newFuncDefinition)
+    } else if (!ignoreIfExists) {
+      throw new AnalysisException(s"function '$identifier' already exists in database '$db'")
+    }
   }
 
   /**
    * Drop a metastore function.
    * If no database is specified, assume the function is in the current database.
    */
-  def dropFunction(name: FunctionIdentifier): Unit = {
+  def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
     val db = name.database.getOrElse(currentDb)
-    val qualified = name.copy(database = Some(db)).unquotedString
-    if (functionRegistry.functionExists(qualified)) {
-      // If we have loaded this function into the FunctionRegistry,
-      // also drop it from there.
-      // For a permanent function, because we loaded it to the FunctionRegistry
-      // when it's first used, we also need to drop it from the FunctionRegistry.
-      functionRegistry.dropFunction(qualified)
+    val identifier = name.copy(database = Some(db))
+    if (functionExists(identifier)) {
+      // TODO: registry should just take in FunctionIdentifier for type safety
+      if (functionRegistry.functionExists(identifier.unquotedString)) {
+        // If we have loaded this function into the FunctionRegistry,
+        // also drop it from there.
+        // For a permanent function, because we loaded it to the FunctionRegistry
+        // when it's first used, we also need to drop it from the FunctionRegistry.
+        functionRegistry.dropFunction(identifier.unquotedString)
+      }
+      externalCatalog.dropFunction(db, name.funcName)
+    } else if (!ignoreIfNotExists) {
+      throw new AnalysisException(s"function '$identifier' does not exist in database '$db'")
     }
-    externalCatalog.dropFunction(db, name.funcName)
   }
 
   /**
@@ -465,8 +474,7 @@ class SessionCatalog(
    * If a database is specified in `name`, this will return the function in that database.
    * If no database is specified, this will return the function in the current database.
    */
-  // TODO: have a better name. This method is actually for fetching the metadata of a function.
-  def getFunction(name: FunctionIdentifier): CatalogFunction = {
+  def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
     val db = name.database.getOrElse(currentDb)
     externalCatalog.getFunction(db, name.funcName)
   }
@@ -475,20 +483,9 @@ class SessionCatalog(
    * Check if the specified function exists.
    */
   def functionExists(name: FunctionIdentifier): Boolean = {
-    if (functionRegistry.functionExists(name.unquotedString)) {
-      // This function exists in the FunctionRegistry.
-      true
-    } else {
-      // Need to check if this function exists in the metastore.
-      try {
-        // TODO: It's better to ask external catalog if this function exists.
-        // So, we can avoid of having this hacky try/catch block.
-        getFunction(name) != null
-      } catch {
-        case _: NoSuchFunctionException => false
-        case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions
with it.
-      }
-    }
+    val db = name.database.getOrElse(currentDb)
+    functionRegistry.functionExists(name.unquotedString) ||
+      externalCatalog.functionExists(db, name.funcName)
   }
 
   // ----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 97b9946..e29d6bd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -152,6 +152,8 @@ abstract class ExternalCatalog {
 
   def getFunction(db: String, funcName: String): CatalogFunction
 
+  def functionExists(db: String, funcName: String): Boolean
+
   def listFunctions(db: String, pattern: String): Seq[String]
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 4d56d00..1850dc8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -62,7 +62,7 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("get database when a database exists") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val db1 = catalog.getDatabase("db1")
+    val db1 = catalog.getDatabaseMetadata("db1")
     assert(db1.name == "db1")
     assert(db1.description.contains("db1"))
   }
@@ -70,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("get database should throw exception when the database does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.getDatabase("db_that_does_not_exist")
+      catalog.getDatabaseMetadata("db_that_does_not_exist")
     }
   }
 
@@ -128,10 +128,10 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("alter database") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val db1 = catalog.getDatabase("db1")
+    val db1 = catalog.getDatabaseMetadata("db1")
     // Note: alter properties here because Hive does not support altering other fields
     catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
-    val newDb1 = catalog.getDatabase("db1")
+    val newDb1 = catalog.getDatabaseMetadata("db1")
     assert(db1.properties.isEmpty)
     assert(newDb1.properties.size == 2)
     assert(newDb1.properties.get("k") == Some("v3"))
@@ -346,21 +346,21 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("get table") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+    assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
       == externalCatalog.getTable("db2", "tbl1"))
     // Get table without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
-    assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+    assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1"))
       == externalCatalog.getTable("db2", "tbl1"))
   }
 
   test("get table when database/table does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+      catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
     }
     intercept[AnalysisException] {
-      catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+      catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
     }
   }
 
@@ -386,7 +386,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("lookup table relation with alias") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val alias = "monster"
-    val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+    val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
     val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
     val relationWithAlias =
       SubqueryAlias(alias,
@@ -659,26 +659,28 @@ class SessionCatalogSuite extends SparkFunSuite {
     val externalCatalog = newEmptyCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
     sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+    sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
     assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
     // Create function without explicitly specifying database
     sessionCatalog.setCurrentDatabase("mydb")
-    sessionCatalog.createFunction(newFunc("myfunc2"))
+    sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
     assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
   }
 
   test("create function when database does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+      catalog.createFunction(
+        newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
     }
   }
 
   test("create function that already exists") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.createFunction(newFunc("func1", Some("db2")))
+      catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false)
     }
+    catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true)
   }
 
   test("create temp function") {
@@ -711,24 +713,27 @@ class SessionCatalogSuite extends SparkFunSuite {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
     assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
-    sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+    sessionCatalog.dropFunction(
+      FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false)
     assert(externalCatalog.listFunctions("db2", "*").isEmpty)
     // Drop function without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+    sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
     assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
-    sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+    sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false)
     assert(externalCatalog.listFunctions("db2", "*").isEmpty)
   }
 
   test("drop function when database/function does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+      catalog.dropFunction(
+        FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false)
     }
     intercept[AnalysisException] {
-      catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+      catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false)
     }
+    catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true)
   }
 
   test("drop temp function") {
@@ -753,19 +758,19 @@ class SessionCatalogSuite extends SparkFunSuite {
     val expected =
       CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
       Seq.empty[(String, String)])
-    assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+    assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
     // Get function without explicitly specifying database
     catalog.setCurrentDatabase("db2")
-    assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+    assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected)
   }
 
   test("get function when database/function does not exist") {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
-      catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+      catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist")))
     }
     intercept[AnalysisException] {
-      catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+      catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2")))
     }
   }
 
@@ -787,8 +792,8 @@ class SessionCatalogSuite extends SparkFunSuite {
     val info2 = new ExpressionInfo("tempFunc2", "yes_me")
     val tempFunc1 = (e: Seq[Expression]) => e.head
     val tempFunc2 = (e: Seq[Expression]) => e.last
-    catalog.createFunction(newFunc("func2", Some("db2")))
-    catalog.createFunction(newFunc("not_me", Some("db2")))
+    catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
+    catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
     catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
     catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
     assert(catalog.listFunctions("db1", "*").toSet ==

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index faa7a2c..3fd2a93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -407,7 +407,7 @@ case class ShowTablePropertiesCommand(
     if (catalog.isTemporaryTable(table)) {
       Seq.empty[Row]
     } else {
-      val catalogTable = sqlContext.sessionState.catalog.getTable(table)
+      val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
 
       propertyKey match {
         case Some(p) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6d56a6f..20779d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -124,7 +124,7 @@ case class AlterDatabaseProperties(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val catalog = sqlContext.sessionState.catalog
-    val db: CatalogDatabase = catalog.getDatabase(databaseName)
+    val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
     catalog.alterDatabase(db.copy(properties = db.properties ++ props))
 
     Seq.empty[Row]
@@ -149,7 +149,8 @@ case class DescribeDatabase(
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName)
+    val dbMetadata: CatalogDatabase =
+      sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
     val result =
       Row("Database Name", dbMetadata.name) ::
         Row("Description", dbMetadata.description) ::
@@ -213,7 +214,7 @@ case class AlterTableSetProperties(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val catalog = sqlContext.sessionState.catalog
-    val table = catalog.getTable(tableName)
+    val table = catalog.getTableMetadata(tableName)
     val newProperties = table.properties ++ properties
     if (DDLUtils.isDatasourceTable(newProperties)) {
       throw new AnalysisException(
@@ -243,7 +244,7 @@ case class AlterTableUnsetProperties(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val catalog = sqlContext.sessionState.catalog
-    val table = catalog.getTable(tableName)
+    val table = catalog.getTableMetadata(tableName)
     if (DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException(
         "alter table properties is not supported for datasource tables")
@@ -286,7 +287,7 @@ case class AlterTableSerDeProperties(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val catalog = sqlContext.sessionState.catalog
-    val table = catalog.getTable(tableName)
+    val table = catalog.getTableMetadata(tableName)
     // Do not support setting serde for datasource tables
     if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException(
@@ -376,7 +377,7 @@ case class AlterTableSetLocation(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val catalog = sqlContext.sessionState.catalog
-    val table = catalog.getTable(tableName)
+    val table = catalog.getTableMetadata(tableName)
     partitionSpec match {
       case Some(spec) =>
         // Partition spec is specified, so we set the location only for this partition

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 66d17e3..c6e6017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -47,6 +47,7 @@ case class CreateFunction(
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
     if (isTemp) {
       if (databaseName.isDefined) {
         throw new AnalysisException(
@@ -55,24 +56,18 @@ case class CreateFunction(
       }
       // We first load resources and then put the builder in the function registry.
       // Please note that it is allowed to overwrite an existing temp function.
-      sqlContext.sessionState.catalog.loadFunctionResources(resources)
+      catalog.loadFunctionResources(resources)
       val info = new ExpressionInfo(className, functionName)
-      val builder =
-        sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
-      sqlContext.sessionState.catalog.createTempFunction(
-        functionName, info, builder, ignoreIfExists = false)
+      val builder = catalog.makeFunctionBuilder(functionName, className)
+      catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
     } else {
       // For a permanent, we will store the metadata into underlying external catalog.
       // This function will be loaded into the FunctionRegistry when a query uses it.
       // We do not load it into FunctionRegistry right now.
-      val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
-      val func = FunctionIdentifier(functionName, Some(dbName))
-      val catalogFunc = CatalogFunction(func, className, resources)
-      if (sqlContext.sessionState.catalog.functionExists(func)) {
-        throw new AnalysisException(
-          s"Function '$functionName' already exists in database '$dbName'.")
-      }
-      sqlContext.sessionState.catalog.createFunction(catalogFunc)
+      // TODO: should we also parse "IF NOT EXISTS"?
+      catalog.createFunction(
+        CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
+        ignoreIfExists = false)
     }
     Seq.empty[Row]
   }
@@ -101,13 +96,9 @@ case class DropFunction(
       catalog.dropTempFunction(functionName, ifExists)
     } else {
       // We are dropping a permanent function.
-      val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
-      val func = FunctionIdentifier(functionName, Some(dbName))
-      if (!ifExists && !catalog.functionExists(func)) {
-        throw new AnalysisException(
-          s"Function '$functionName' does not exist in database '$dbName'.")
-      }
-      catalog.dropFunction(func)
+      catalog.dropFunction(
+        FunctionIdentifier(functionName, databaseName),
+        ignoreIfNotExists = ifExists)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a8db4e9..7084665 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -88,7 +88,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
 
         sql(s"CREATE DATABASE $dbName")
-        val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+        val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
         assert(db1 == CatalogDatabase(
           dbNameWithoutBackTicks,
           "",
@@ -110,7 +110,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
       try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         sql(s"CREATE DATABASE $dbName")
-        val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+        val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
         assert(db1 == CatalogDatabase(
           dbNameWithoutBackTicks,
           "",
@@ -233,14 +233,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
-    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
     // set table properties
     sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
-    assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" ->
"bel"))
+    assert(catalog.getTableMetadata(tableIdent).properties ==
+      Map("andrew" -> "or14", "kor" -> "bel"))
     // set table properties without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
-    assert(catalog.getTable(tableIdent).properties ==
+    assert(catalog.getTableMetadata(tableIdent).properties ==
       Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -262,11 +263,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     // unset table properties
     sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
     sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
-    assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
+    assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" ->
"lan"))
     // unset table properties without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
-    assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
+    assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan"))
     // table to alter does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
@@ -278,7 +279,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     assert(e.getMessage.contains("xyz"))
     // property to unset does not exist, but "IF EXISTS" is specified
     sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
-    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
     // throw exception for datasource tables
     convertToDatasourceTable(catalog, tableIdent)
     val e1 = intercept[AnalysisException] {
@@ -393,7 +394,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
   private def convertToDatasourceTable(
       catalog: SessionCatalog,
       tableIdent: TableIdentifier): Unit = {
-    catalog.alterTable(catalog.getTable(tableIdent).copy(
+    catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
       properties = Map("spark.sql.sources.provider" -> "csv")))
   }
 
@@ -407,15 +408,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
-    assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
-    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
     // Verify that the location is set to the expected string
     def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit =
{
       val storageFormat = spec
         .map { s => catalog.getPartition(tableIdent, s).storage }
-        .getOrElse { catalog.getTable(tableIdent).storage }
+        .getOrElse { catalog.getTableMetadata(tableIdent).storage }
       if (isDatasourceTable) {
         if (spec.isDefined) {
           assert(storageFormat.serdeProperties.isEmpty)
@@ -467,8 +468,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
-    assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
-    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+    assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
     // set table serde and/or properties (should fail on datasource tables)
     if (isDatasourceTable) {
       val e1 = intercept[AnalysisException] {
@@ -482,22 +483,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach
{
       assert(e2.getMessage.contains("datasource"))
     } else {
       sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
-      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
-      assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
+      assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
       sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
         "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
-      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
-      assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
+      assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
         Map("k" -> "v", "kay" -> "vee"))
     }
     // set serde properties only
     sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
-    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+    assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
       Map("k" -> "vvv", "kay" -> "vee"))
     // set things without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
-    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+    assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
       Map("k" -> "vvv", "kay" -> "veee"))
     // table to alter does not exist
     intercept[AnalysisException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 98a5998..b1156fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -292,6 +292,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends
ExternalCat
     client.getFunction(db, funcName)
   }
 
+  override def functionExists(db: String, funcName: String): Boolean = withClient {
+    client.functionExists(db, funcName)
+  }
+
   override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
     client.listFunctions(db, pattern)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ee56f9d..94794b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -232,6 +232,11 @@ private[hive] trait HiveClient {
   /** Return an existing function in the database, or None if it doesn't exist. */
   def getFunctionOption(db: String, name: String): Option[CatalogFunction]
 
+  /** Return whether a function exists in the specified database. */
+  final def functionExists(db: String, name: String): Boolean = {
+    getFunctionOption(db, name).isDefined
+  }
+
   /** Return the names of all functions that match the given pattern in the database. */
   def listFunctions(db: String, pattern: String): Seq[String]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1f66fbf..d0eb9dd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,7 +21,6 @@ import java.io.{File, PrintStream}
 
 import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
-import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -30,7 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function
=> HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table =>
HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException}
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
@@ -559,7 +559,11 @@ private[hive] class HiveClientImpl(
   override def getFunctionOption(
       db: String,
       name: String): Option[CatalogFunction] = withHiveState {
-    Option(client.getFunction(db, name)).map(fromHiveFunction)
+    try {
+      Option(client.getFunction(db, name)).map(fromHiveFunction)
+    } catch {
+      case he: HiveException => None
+    }
   }
 
   override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 6967395..ada8621 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
             .saveAsTable("t")
         }
 
-        val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+        val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
         assert(hiveTable.storage.inputFormat === Some(inputFormat))
         assert(hiveTable.storage.outputFormat === Some(outputFormat))
         assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
               .saveAsTable("t")
           }
 
-          val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+          val hiveTable =
+            sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +145,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
                |AS SELECT 1 AS d1, "val_1" AS d2
              """.stripMargin)
 
-          val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+          val hiveTable =
+            sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))

http://git-wip-us.apache.org/repos/asf/spark/blob/ae1db91d/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index dd21293..c5417b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -354,7 +354,7 @@ object PermanentHiveUDFTest2 extends Logging {
       FunctionIdentifier("example_max"),
       "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
       ("JAR" -> jar) :: Nil)
-    hiveContext.sessionState.catalog.createFunction(function)
+    hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
     val source =
       hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
     source.registerTempTable("sourceTable")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message