spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20303][SQL] Rename createTempFunction to registerFunction
Date Wed, 12 Apr 2017 16:01:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master ceaf77ae4 -> 504e62e2f


[SPARK-20303][SQL] Rename createTempFunction to registerFunction

### What changes were proposed in this pull request?
Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent
functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by
`registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo`
into the new `registerFunction`, instead of duplicating the logics everywhere.

In the next PRs, the remaining Function-related APIs also need cleanups.

### How was this patch tested?
Existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17615 from gatorsmile/cleanupCreateTempFunction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/504e62e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/504e62e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/504e62e2

Branch: refs/heads/master
Commit: 504e62e2f4b7df7e002ea014a855cebe1ff95193
Parents: ceaf77a
Author: Xiao Li <gatorsmile@gmail.com>
Authored: Wed Apr 12 09:01:26 2017 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Wed Apr 12 09:01:26 2017 -0700

----------------------------------------------------------------------
 .../analysis/AlreadyExistException.scala        |  3 --
 .../sql/catalyst/catalog/SessionCatalog.scala   | 31 +++++++--------
 .../catalyst/catalog/SessionCatalogSuite.scala  | 40 ++++++++++----------
 .../spark/sql/execution/command/functions.scala |  9 ++---
 .../spark/sql/internal/CatalogSuite.scala       |  5 ++-
 .../spark/sql/hive/HiveSessionCatalog.scala     | 18 +++------
 .../ObjectHashAggregateExecBenchmark.scala      | 10 +++--
 7 files changed, 53 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index ec56fe7..57f7a80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -44,6 +44,3 @@ class PartitionsAlreadyExistException(db: String, table: String, specs:
Seq[Tabl
 
 class FunctionAlreadyExistsException(db: String, func: String)
   extends AnalysisException(s"Function '$func' already exists in database '$db'")
-
-class TempFunctionAlreadyExistsException(func: String)
-  extends AnalysisException(s"Temporary function '$func' already exists")

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index faedf5f..1417bcc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1050,7 +1050,7 @@ class SessionCatalog(
    *
    * This performs reflection to decide what type of [[Expression]] to return in the builder.
    */
-  def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
+  protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder
= {
     // TODO: at least support UDAFs here
     throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
   }
@@ -1064,18 +1064,20 @@ class SessionCatalog(
   }
 
   /**
-   * Create a temporary function.
-   * This assumes no database is specified in `funcDefinition`.
+   * Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
    */
-  def createTempFunction(
-      name: String,
-      info: ExpressionInfo,
-      funcDefinition: FunctionBuilder,
-      ignoreIfExists: Boolean): Unit = {
-    if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists)
{
-      throw new TempFunctionAlreadyExistsException(name)
+  def registerFunction(
+      funcDefinition: CatalogFunction,
+      ignoreIfExists: Boolean,
+      functionBuilder: Option[FunctionBuilder] = None): Unit = {
+    val func = funcDefinition.identifier
+    if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists)
{
+      throw new AnalysisException(s"Function $func already exists")
     }
-    functionRegistry.registerFunction(name, info, funcDefinition)
+    val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
+    val builder =
+      functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
+    functionRegistry.registerFunction(func.unquotedString, info, builder)
   }
 
   /**
@@ -1180,12 +1182,7 @@ class SessionCatalog(
     // catalog. So, it is possible that qualifiedName is not exactly the same as
     // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
     // At here, we preserve the input from the user.
-    val info = new ExpressionInfo(
-      catalogFunction.className,
-      qualifiedName.database.orNull,
-      qualifiedName.funcName)
-    val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
-    createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
+    registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false)
     // Now, we need to create the Expression.
     functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9ba846f..be89030 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -1162,10 +1162,10 @@ abstract class SessionCatalogSuite extends PlanTest {
     withBasicCatalog { catalog =>
       val tempFunc1 = (e: Seq[Expression]) => e.head
       val tempFunc2 = (e: Seq[Expression]) => e.last
-      val info1 = new ExpressionInfo("tempFunc1", "temp1")
-      val info2 = new ExpressionInfo("tempFunc2", "temp2")
-      catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
-      catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
+      catalog.registerFunction(
+        newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
+      catalog.registerFunction(
+        newFunc("temp2", None), ignoreIfExists = false, functionBuilder = Some(tempFunc2))
       val arguments = Seq(Literal(1), Literal(2), Literal(3))
       assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1))
       assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3))
@@ -1174,13 +1174,15 @@ abstract class SessionCatalogSuite extends PlanTest {
         catalog.lookupFunction(FunctionIdentifier("temp3"), arguments)
       }
       val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
-      val info3 = new ExpressionInfo("tempFunc3", "temp1")
       // Temporary function already exists
-      intercept[TempFunctionAlreadyExistsException] {
-        catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
-      }
+      val e = intercept[AnalysisException] {
+        catalog.registerFunction(
+          newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc3))
+      }.getMessage
+      assert(e.contains("Function temp1 already exists"))
       // Temporary function is overridden
-      catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
+      catalog.registerFunction(
+        newFunc("temp1", None), ignoreIfExists = true, functionBuilder = Some(tempFunc3))
       assert(
         catalog.lookupFunction(
           FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
@@ -1193,8 +1195,8 @@ abstract class SessionCatalogSuite extends PlanTest {
       assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
 
       val tempFunc1 = (e: Seq[Expression]) => e.head
-      val info1 = new ExpressionInfo("tempFunc1", "temp1")
-      catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
+      catalog.registerFunction(
+        newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
 
       // Returns true when the function is temporary
       assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
@@ -1243,9 +1245,9 @@ abstract class SessionCatalogSuite extends PlanTest {
 
   test("drop temp function") {
     withBasicCatalog { catalog =>
-      val info = new ExpressionInfo("tempFunc", "func1")
       val tempFunc = (e: Seq[Expression]) => e.head
-      catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
+      catalog.registerFunction(
+        newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc))
       val arguments = Seq(Literal(1), Literal(2), Literal(3))
       assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
       catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1284,9 +1286,9 @@ abstract class SessionCatalogSuite extends PlanTest {
 
   test("lookup temp function") {
     withBasicCatalog { catalog =>
-      val info1 = new ExpressionInfo("tempFunc1", "func1")
       val tempFunc1 = (e: Seq[Expression]) => e.head
-      catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
+      catalog.registerFunction(
+        newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
       assert(catalog.lookupFunction(
         FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
       catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1298,14 +1300,14 @@ abstract class SessionCatalogSuite extends PlanTest {
 
   test("list functions") {
     withBasicCatalog { catalog =>
-      val info1 = new ExpressionInfo("tempFunc1", "func1")
-      val info2 = new ExpressionInfo("tempFunc2", "yes_me")
+      val funcMeta1 = newFunc("func1", None)
+      val funcMeta2 = newFunc("yes_me", None)
       val tempFunc1 = (e: Seq[Expression]) => e.head
       val tempFunc2 = (e: Seq[Expression]) => e.last
       catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
       catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
-      catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
-      catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
+      catalog.registerFunction(funcMeta1, ignoreIfExists = false, functionBuilder = Some(tempFunc1))
+      catalog.registerFunction(funcMeta2, ignoreIfExists = false, functionBuilder = Some(tempFunc2))
       assert(catalog.listFunctions("db1", "*").map(_._1).toSet ==
         Set(FunctionIdentifier("func1"),
           FunctionIdentifier("yes_me")))

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 5687f93..e0d0029 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -51,6 +51,7 @@ case class CreateFunctionCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
+    val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className,
resources)
     if (isTemp) {
       if (databaseName.isDefined) {
         throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION
" +
@@ -59,17 +60,13 @@ case class CreateFunctionCommand(
       // We first load resources and then put the builder in the function registry.
       // Please note that it is allowed to overwrite an existing temp function.
       catalog.loadFunctionResources(resources)
-      val info = new ExpressionInfo(className, functionName)
-      val builder = catalog.makeFunctionBuilder(functionName, className)
-      catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
+      catalog.registerFunction(func, ignoreIfExists = false)
     } else {
       // For a permanent, we will store the metadata into underlying external catalog.
       // This function will be loaded into the FunctionRegistry when a query uses it.
       // We do not load it into FunctionRegistry right now.
       // TODO: should we also parse "IF NOT EXISTS"?
-      catalog.createFunction(
-        CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
-        ignoreIfExists = false)
+      catalog.createFunction(func, ignoreIfExists = false)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 6469e50..8f9c52c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -75,9 +75,10 @@ class CatalogSuite
   }
 
   private def createTempFunction(name: String): Unit = {
-    val info = new ExpressionInfo("className", name)
     val tempFunc = (e: Seq[Expression]) => e.head
-    sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
+    val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil)
+    sessionCatalog.registerFunction(
+      funcMeta, ignoreIfExists = false, functionBuilder = Some(tempFunc))
   }
 
   private def dropFunction(name: String, db: Option[String] = None): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index c917f11..377d4f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager,
SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager,
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
@@ -124,13 +124,6 @@ private[sql] class HiveSessionCatalog(
   }
 
   private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression
= {
-    // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method
to
-    // if (super.functionExists(name)) {
-    //   super.lookupFunction(name, children)
-    // } else {
-    //   // This function is a Hive builtin function.
-    //   ...
-    // }
     val database = name.database.map(formatDatabaseName)
     val funcName = name.copy(database = database)
     Try(super.lookupFunction(funcName, children)) match {
@@ -164,10 +157,11 @@ private[sql] class HiveSessionCatalog(
             }
           }
           val className = functionInfo.getFunctionClass.getName
-          val builder = makeFunctionBuilder(functionName, className)
+          val functionIdentifier =
+            FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database)
+          val func = CatalogFunction(functionIdentifier, className, Nil)
           // Put this Hive built-in function to our function registry.
-          val info = new ExpressionInfo(className, functionName)
-          createTempFunction(functionName, info, builder, ignoreIfExists = false)
+          registerFunction(func, ignoreIfExists = false)
           // Now, we need to create the Expression.
           functionRegistry.lookupFunction(functionName, children)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/504e62e2/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
index 197110f..73383ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
@@ -22,7 +22,9 @@ import scala.concurrent.duration._
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox
 
 import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo, Literal}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
 import org.apache.spark.sql.hive.HiveSessionCatalog
 import org.apache.spark.sql.hive.execution.TestingTypedCount
@@ -217,9 +219,9 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingle
 
   private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = {
     val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-    val builder = sessionCatalog.makeFunctionBuilder(functionName, clazz.getName)
-    val info = new ExpressionInfo(clazz.getName, functionName)
-    sessionCatalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
+    val functionIdentifier = FunctionIdentifier(functionName, database = None)
+    val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil)
+    sessionCatalog.registerFunction(func, ignoreIfExists = false)
   }
 
   private def percentile_approx(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message