flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
Date Mon, 21 Aug 2017 06:37:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134776#comment-16134776
] 

ASF GitHub Bot commented on FLINK-7206:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r134137525
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
---
    @@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
     
         fieldTerm
       }
    +
    +  /**
    +    * Adds a reusable class to the member area of the generated [[Function]].
    +    */
    +  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
    +    val field =
    +      s"""
    +         |transient ${clazz.getCanonicalName} $fieldTerm = null;
    +         |""".stripMargin
    +    reusableMemberStatements.add(field)
    +  }
    +
    +  /**
    +    * Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]].
    +    *
    +    * @param indices indices of aggregate functions.
    +    * @param ctxTerm field name of runtime context.
    +    * @param accConfig data view config which contains id, field and StateDescriptos.
    +    * @return statements to create [[MapView]] or [[ListView]].
    +    */
    +  def addReusableDataViewConfig(
    +      indices: Range,
    +      ctxTerm: String,
    +      accConfig: Option[DataViewConfig])
    +    : String = {
    +    if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
    +      val initDataViews = new StringBuilder
    +      val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs
    +        .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
    +        .toMap[String, StateDescriptor[_, _]]
    +
    +      for (i <- indices) yield {
    +        for (spec <- accConfig.get.accSpecs(i)) yield {
    +          val dataViewField = spec.field
    +          val dataViewTypeTerm = dataViewField.getType.getCanonicalName
    +          val desc = descMapping.getOrElse(spec.id,
    +            throw new CodeGenException(s"Can not find ListView in accumulator by id:
${spec.id}"))
    +
    +          val serializedData = AggregateUtil.serialize(desc)
    +          val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview"
    +          val field =
    +            s"""
    +               |transient $dataViewTypeTerm $dataViewFieldTerm = null;
    +               |""".stripMargin
    +          reusableMemberStatements.add(field)
    +
    +          val descFieldTerm = s"${dataViewFieldTerm}_desc"
    +          val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName
    +          val descDeserialize =
    +            s"""
    +               |    $descClassQualifier $descFieldTerm = ($descClassQualifier)
    +               |      ${AggregateUtil.getClass.getName.stripSuffix("$")}
    +               |      .deserialize("$serializedData");
    +             """.stripMargin
    +
    +          val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
    +            s"""
    +               |    $descDeserialize
    +               |    $dataViewFieldTerm =
    +               |      org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
    --- End diff --
    
    I think we do not need the `StateViewUtils` here, we can create a MapView using code gen
directly, because we already have the RuntimeContext and StateDescriptor. 


> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
>                 Key: FLINK-7206
>                 URL: https://issues.apache.org/jira/browse/FLINK-7206
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message