flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6888) Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala case/tuple class
Date Mon, 12 Jun 2017 08:17:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046319#comment-16046319
] 

ASF GitHub Bot commented on FLINK-6888:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121321436
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
---
    @@ -329,6 +337,41 @@ object UserDefinedFunctionUtils {
         }
       }
     
    +
    +  /**
    +    * Internal method of AggregateFunction#getAccumulatorType() that does some pre-checking
    +    * and uses [[TypeExtractor]] as default return type inference.
    +    */
    +  def getAccumulatorTypeOfAggregateFunction(
    +    aggregateFunction: AggregateFunction[_, _],
    --- End diff --
    
    Can we abstract a method according ` getAccumulatorTypeOfAggregateFunction` and `getResultTypeOfAggregateFunction`.
just like follows:
    
    ```
    def getTypeByMethodOrTypeExtractor(
          aggregateFunction: AggregateFunction[_, _],
          clazz: Class[_],
          methodName: String,
          candidateType: TypeInformation[_] = null): TypeInformation[_] = {
    
        val resultType = try {
          val method: Method = aggregateFunction.getClass.getMethod(methodName)
          method.invoke(aggregateFunction).asInstanceOf[TypeInformation[_]]
        } catch {
          case _: NoSuchMethodException => null
          case ite: Throwable => throw new TableException("Unexpected exception:", ite)
        }
    
        if (resultType != null) {
          resultType
        } else if(candidateType != null) {
          candidateType
        } else {
          try {
            TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[_]]
          } catch {
            case ite: InvalidTypesException =>
              throw new TableException(
                s"Cannot infer type of $clazz" +
                s"You can override AggregateFunction.$methodName to specify the type.", ite)
          }
        }
      }
    ```
    I'm not sure whether it is the best way, Please let me know what you think?


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC is a Scala
case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of {{org.apache.flink.table.functions.AggregateFunction[T,
ACC]}} is extracted using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class
or tuple class, the TypeInformation will fall back to {{GenericType}} which result in bad
performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message