flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dian Fu <dian0511...@gmail.com>
Subject Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Date Wed, 13 Nov 2019 11:35:33 GMT
1)在Table API & SQL中,RuntimeContext是不暴露给用户用的,所以是private
2)窗口之间聚合值的差值,可以看看cep能否满足需求,可以参考文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html>
> 在 2019年11月13日,下午3:35,Chennet Steven <StevenChennet@live.com> 写道:
> 
> 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
> 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
> 
> From stevenchen
>         webchat 38798579
> 
> 发件人: Dian Fu<mailto:dian0511.fu@gmail.com>
> 发送时间: Thursday, November 7, 2019 19:41
> 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
> 
> 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
<https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
>> 在 2019年11月7日,下午7:06,Chennet Steven <StevenChennet@live.com>
写道:
>> 
>> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
>> 能否给个example或者是test代码的链接啊?
>> 
>> From stevenchen
>>        webchat 38798579
>> 
>> ________________________________
>> 发件人: wenlong.lwl <wenlong88.lwl@gmail.com>
>> 发送时间: Thursday, November 7, 2019 2:13:43 PM
>> 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>> 
>> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>> 
>> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <stevenchennet@live.com> wrote:
>> 
>>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>>> 如何在聚合函数中使用State?
>>> 
>>> 
>>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>>> TypeInformation}
>>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>>> import org.apache.flink.table.functions.{AggregateFunction,
>>> FunctionContext}
>>> import java.lang.{Iterable => JIterable}
>>> 
>>> 
>>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>> 
>>> class IntDiffSumFunction extends AggregateFunction[Int,
>>> IntDiffSumAccumulator] {
>>> 
>>> override def open(context: FunctionContext): Unit = {
>>>   // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>>   //getRuntimeContext.getState(desc)
>>>   val a = this.hashCode()
>>>   print(s"hashCode:$a")
>>>   super.open(context)
>>> }
>>> 
>>> override def createAccumulator(): IntDiffSumAccumulator = {
>>>   val acc = new IntDiffSumAccumulator()
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>>   acc
>>> }
>>> 
>>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>>   accumulator.f0 += value
>>>   accumulator.f1 = true
>>> }
>>> 
>>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>>   if (accumulator.f1) {
>>> 
>>>     accumulator.f0
>>>   } else {
>>>     Int.MinValue
>>>   }
>>> }
>>> 
>>> def merge(acc: IntDiffSumAccumulator, its:
>>> JIterable[IntDiffSumAccumulator]) = {
>>>   val iter = its.iterator()
>>>   while (true) {
>>>     val a = iter.next()
>>>     if (a.f1) {
>>>       acc.f0 += a.f0
>>>       acc.f1 = true
>>>     }
>>>   }
>>> }
>>> 
>>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>>   acc.f0 = 0
>>>   acc.f1 = false
>>> }
>>> 
>>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>>   new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>>> }
>>> 
>>> 
>>> From stevenchen
>>>        webchat 38798579
>>> 
>>> 
>>> 
> 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message