flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jingsong Li <jingsongl...@gmail.com>
Subject Re: flink窗口函数AggregateFunction中,merge的作用和应用场景
Date Wed, 06 May 2020 14:00:11 GMT
Hi,

首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。

merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。

当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。

Best,
Jingsong Lee

On Wed, May 6, 2020 at 9:22 PM Benchao Li <libenchao@gmail.com> wrote:

> Hi,
>
> 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> window的pane都merge起来。
>
> Zhefu PENG <pengzf0802@gmail.com> 于2020年5月6日周三 下午9:05写道:
>
> > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> >
> > On Wed, May 6, 2020 at 20:49 1193216154 <1193216154@qq.com> wrote:
> >
> > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > >
> > >
> > >
> > > ---原始邮件---
> > > 发件人: "Zhefu PENG"<pengzf0802@gmail.com&gt;
> > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > 收件人: "user-zh"<user-zh@flink.apache.org&gt;;
> > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > >
> > >
> > > Hi all,
> > >
> > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > >
> > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > >
> > > 代码(简单伪代码)如下:
> > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2,
第一项是数据的Set(保证去重),
> > > 第二项是计算的方差结果,输出的结果是
> > > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > > Double), Double] {
> > > &nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double)
= {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重,
后一项是计算方差后的结果
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > > &nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator:
(Set,
> > > Double)): (Set,
> > > Double) = {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过:
更新set,
> > > 并且增量更新方差;如果没出现过,直接跳过
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > >
> > > &nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)):
> > > Double = {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
> > > &nbsp;&nbsp;&nbsp; }
> > >
> > > &nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set,
> > Double)):
> > > (Set, Double) =
> > > {
> > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > &nbsp; }
> > >
> > >
> > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > >
> > > Looking forward to your reply and help.
> > >
> > > Best,
> > > Zhefu
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 
Best, Jingsong Lee
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message