spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liancheng <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4785] [SQL] Support udf instance ser/de...
Date Tue, 09 Dec 2014 08:34:26 GMT
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3640#discussion_r21512962
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -54,47 +54,95 @@ private[hive] abstract class HiveFunctionRegistry
         val functionClassName = functionInfo.getFunctionClass.getName
     
         if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveSimpleUdf(functionClassName, children)
    +      HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveGenericUdf(functionClassName, children)
    +      HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
         } else if (
              classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass))
{
    -      HiveGenericUdaf(functionClassName, children)
    +      HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
    -      HiveUdaf(functionClassName, children)
    +      HiveUdaf(new HiveFunctionCache(functionClassName), children)
         } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass))
{
    -      HiveGenericUdtf(functionClassName, Nil, children)
    +      HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
         } else {
           sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
         }
       }
     }
     
    -private[hive] trait HiveFunctionFactory {
    -  val functionClassName: String
    -
    -  def createFunction[UDFType]() =
    -    getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    -}
    -
    -private[hive] abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory
{
    -  self: Product =>
    +/**
    + * This class provides the UDF creation and also the UDF instance serialization and
    + * de-serialization cross process boundary.
    + *
    + * We use class instead of trait, seems property variables of trait cannot be serialized
when
    + * bundled with Case Class; in the other hand, we need to intercept the UDF instance
ser/de.
    + * the "Has-a" probably better than "Is-a".
    + * @param functionClassName UDF class name
    + */
    +class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable
{
    +  // for Serialization
    +  def this() = this(null)
    +
    +  private var instance: Any = null
    +
    +  def writeExternal(out: java.io.ObjectOutput) {
    +    // output the function name
    +    out.writeUTF(functionClassName)
    +
    +    // Write a flag if instance is null or not
    +    out.writeBoolean(instance != null)
    +    if (instance != null) {
    +      // Some of the UDF are serializable, but some others are not
    +      // Hive Utilities can handle both cases
    +      val baos = new java.io.ByteArrayOutputStream()
    +      HiveShim.serializePlan(instance, baos)
    +      val functionInBytes = baos.toByteArray
    +
    +      // output the function bytes
    +      out.writeInt(functionInBytes.length)
    +      out.write(functionInBytes, 0, functionInBytes.length)
    +    }
    +  }
     
    -  type UDFType
    -  type EvaluatedType = Any
    +  def readExternal(in: java.io.ObjectInput) {
    +    // read the function name
    +    functionClassName = in.readUTF()
     
    -  def nullable = true
    +    if (in.readBoolean()) {
    +      // if the instance is not null
    +      // read the function in bytes
    +      val functionInBytesLength = in.readInt()
    +      val functionInBytes = new Array[Byte](functionInBytesLength)
    +      in.read(functionInBytes, 0, functionInBytesLength)
     
    -  lazy val function = createFunction[UDFType]()
    +      // deserialize the function object via Hive Utilities
    +      instance = HiveShim.deserializePlan(new java.io.ByteArrayInputStream(functionInBytes),
    +        getContextOrSparkClassLoader.loadClass(functionClassName))
    +    }
    +  }
     
    -  override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})"
    +  def createFunction[UDFType](alwaysCreateNewInstance: Boolean = false) = {
    +    if (alwaysCreateNewInstance) {
    +      getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
    +    } else {
    +      if (instance == null) {
    +        instance = getContextOrSparkClassLoader.loadClass(functionClassName).newInstance
    +      }
    +      instance.asInstanceOf[UDFType]
    +    }
    --- End diff --
    
    Actually, how about removing the `alwaysCreateNewInstance` argument (which is confusing),
and define a new `HiveSimpleUdfWrapper` that overrides `createFunction`, and always return
a new instance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message