Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7B7FC200CEC for ; Mon, 21 Aug 2017 08:37:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7A2FD163FD4; Mon, 21 Aug 2017 06:37:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BFF40163FC3 for ; Mon, 21 Aug 2017 08:37:11 +0200 (CEST) Received: (qmail 5412 invoked by uid 500); 21 Aug 2017 06:37:10 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 5387 invoked by uid 99); 21 Aug 2017 06:37:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2017 06:37:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5C531C00CE for ; Mon, 21 Aug 2017 06:37:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id pDS39Fx3fYTU for ; Mon, 21 Aug 2017 06:37:09 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 2C67B5FB9F for ; Mon, 21 Aug 2017 06:37:08 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 13F78E0EA7 for ; Mon, 21 Aug 2017 06:37:07 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id CBDCF253A1 for ; Mon, 21 Aug 2017 06:37:02 +0000 (UTC) Date: Mon, 21 Aug 2017 06:37:02 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 21 Aug 2017 06:37:12 -0000 [ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134776#comment-16134776 ] ASF GitHub Bot commented on FLINK-7206: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134137525 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** + * Adds a reusable class to the member area of the generated [[Function]]. + */ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { + val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + } + + /** + * Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. + * + * @param indices indices of aggregate functions. + * @param ctxTerm field name of runtime context. + * @param accConfig data view config which contains id, field and StateDescriptos. + * @return statements to create [[MapView]] or [[ListView]]. + */ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) + : String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { + for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = + s""" + | $descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { + s""" + | $descDeserialize + | $dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- I think we do not need the `StateViewUtils` here, we can create a MapView using code gen directly, because we already have the RuntimeContext and StateDescriptor. > Implementation of DataView to support state access for UDAGG > ------------------------------------------------------------ > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kaibo Zhou > Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)