flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chennet Steven <stevenchen...@live.com>
Subject 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Date Thu, 14 Nov 2019 09:32:20 GMT
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟
diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}     ---- 这分钟的1.3 是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}


From stevenchen
         webchat 38798579

________________________________
发件人: Yuan,Youjun <yuanyoujun@baidu.com>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature
+ tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature)
AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
        SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid
ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
        FROM (
                SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp)
AS max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
                )
        )

我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU
ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
    "注释":{
        "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差,
Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
        "输入示例": "1000,dev1,2.3",
        "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
    },
    "sources": [{
        "schema": {
            "format": "CSV",
            "fields": [{
                    "name": "ts",
                    "type": "SQL_TIMESTAMP"
                },
                {
                    "name": "deviceid",
                    "type": "STRING"
                },
                {
                    "name": "temp",
                    "type": "DOUBLE"
                }]
        },
        "watermark": 0,
        "name": "mysrc",
        "eventTime": "ts",
        "type": "COLLECTION",
        "attr": {
            "input": [
                "10000,dev1,1.1",
                "20000,dev1,1.2",
                "50000,dev1,1.3",
                "60000,dev1,1.4",
                "100000,dev1,1.5",
                "110000,dev1,1.6",
                "120000,dev1,1.7"
            ]
        }
    }],
    "sink": {
        "schema": {
            "format": "JSON"
        },
        "name": "mysink",
        "type": "STDOUT"
    },
    "name": "demojob",
    "timeType": "EVENTTIME",
    "sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - sum_temperature
AS diff_temperature FROM ( SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature
 from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}



-----邮件原件-----
发件人: Chennet Steven <stevenchennet@live.com>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
         webchat 38798579

发件人: Dian Fu<mailto:dian0511.fu@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
<https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <StevenChennet@live.com> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
>         webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <wenlong88.lwl@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <stevenchennet@live.com> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>    // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>    //getRuntimeContext.getState(desc)
>>    val a = this.hashCode()
>>    print(s"hashCode:$a")
>>    super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>    val acc = new IntDiffSumAccumulator()
>>    acc.f0 = 0
>>    acc.f1 = false
>>    acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>    accumulator.f0 += value
>>    accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>    if (accumulator.f1) {
>>
>>      accumulator.f0
>>    } else {
>>      Int.MinValue
>>    }
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>    val iter = its.iterator()
>>    while (true) {
>>      val a = iter.next()
>>      if (a.f1) {
>>        acc.f0 += a.f0
>>        acc.f1 = true
>>      }
>>    }
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>    acc.f0 = 0
>>    acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>    new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>>         webchat 38798579
>>
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message