flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
Date Mon, 28 Aug 2017 22:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144439#comment-16144439
] 

ASF GitHub Bot commented on FLINK-7206:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135528903
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
---
    @@ -161,13 +182,119 @@ class AggregationCodeGenerator(
             }
         }
     
    +    /**
    +      * Create DataView Term, for example, acc1_map_dataview.
    +      *
    +      * @param aggIndex index of aggregate function
    +      * @param fieldName field name of DataView
    +      * @return term to access [[MapView]] or [[ListView]]
    +      */
    +    def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
    +      s"acc${aggIndex}_${fieldName}_dataview"
    +    }
    +
    +    /**
    +      * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open,
cleanup,
    +      * close and member area of the generated function.
    +      *
    +      */
    +    def addReusableDataViews: Unit = {
    +      if (accConfig.isDefined) {
    +        val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
    +          .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
    +          .toMap[String, StateDescriptor[_, _]]
    +
    +        for (i <- aggs.indices) yield {
    +          for (spec <- accConfig.get(i)) yield {
    +            val dataViewField = spec.field
    +            val dataViewTypeTerm = dataViewField.getType.getCanonicalName
    +            val desc = descMapping.getOrElse(spec.id,
    +              throw new CodeGenException(s"Can not find DataView in accumulator by id:
${spec.id}"))
    +
    +            // define the DataView variables
    +            val serializedData = AggregateUtil.serialize(desc)
    --- End diff --
    
    move `serialize` method to this class and rename to `serializeStateDescriptor`


> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
>                 Key: FLINK-7206
>                 URL: https://issues.apache.org/jira/browse/FLINK-7206
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message